数据准备
数据下载:《莎士比亚全集》
我们先来看看原始数据:首先将数据加载到RDD,然后显示数据框的前15行。
shakespeareDF = sqlContext.read.text(fileName)shakespeareDF.show(15, truncate=False)
输出如下:
+-------------------------------------------------------+|value |+-------------------------------------------------------+|1609 || ||THE SONNETS || ||by William Shakespeare || || || || 1 || From fairest creatures we desire increase, || That thereby beauty's rose might never die, || But as the riper should by time decease, || His tender heir might bear his memory: || But thou contracted to thine own bright eyes, || Feed'st thy light's flame with self-substantial fuel,|+-------------------------------------------------------+
数据清洗
因为原始数据包括标点符号,大小写字符,空行。所以我们需要对数据进行清洗。所以我提供了一个removePunctuation函数。这个函数将去掉了标点,删除了句子两端的多余的空格,并将字符全部转换为小写。
from pyspark.sql.functions import regexp_replace, trim, lowerdef removePunctuation(column): return lower(trim(regexp_replace(column, '[^\w\s]', '')))
为了使用这个函数,我们先来看一个例子。
sentenceDF = (sqlContext .createDataFrame([('Hi, you!',), (' No under_score!',), (' * Remove punctuation then spaces * ',)], ['sentence']))sentenceDF.show(truncate=False)
原始的数据框输出如下:
+------------------------------------------+|sentence |+------------------------------------------+|Hi, you! || No under_score! || * Remove punctuation then spaces * |+------------------------------------------+
接下来使用removePunctuation进行清洗。
from pyspark.sql.functions import col(sentenceDF .select(removePunctuation(col('sentence')).alias('sentence')) .show(truncate=False))
清洗后的数据框输出如下:
+------------------------------+|sentence |+------------------------------+|hi you ||no under_score ||remove punctuation then spaces|+------------------------------+
有了这个函数,我们就能对《莎士比亚全集》进行清洗了,首先将shakespeare.txt加载到RDD,并使用removePunctuation函数对数据进行清洗.
from pyspark.sql.functions import colfileName = "shakespeare.txt"shakespeareDF = (sqlContext .read .text(fileName) .select(removePunctuation(col('value')).alias('value')))shakespeareDF.show(15, truncate=False)
清洗后的数据框输出如下:
+-------------------------------------------------+|value |+-------------------------------------------------+|1609 || ||the sonnets || ||by william shakespeare || || || ||1 ||from fairest creatures we desire increase ||that thereby beautys rose might never die ||but as the riper should by time decease ||his tender heir might bear his memory ||but thou contracted to thine own bright eyes ||feedst thy lights flame with selfsubstantial fuel|+-------------------------------------------------+
接下来,我们使用split函数分隔每一行的句子,然后用explode函数将行转列,得到一个包括所有单词的数据框,最后使用where函数过滤掉数据框的空行。
from pyspark.sql.functions import split, explodeshakeWordsDF = (shakespeareDF .select(explode(split(shakespeareDF.value, ' ')).alias('word')) .where("word<>''"))shakeWordsDF.show()shakeWordsDFCount = shakeWordsDF.count()print shakeWordsDFCount
转换后的数据框输出如下:
+-----------+| word|+-----------+| 1609|| the|| sonnets|| by|| william||shakespeare|| 1|| from|| fairest|| creatures|| we|| desire|| increase|| that|| thereby|| beautys|| rose|| might|| never|| die|+-----------+
数据统计
为了统计单词数,我提供一个wordCount函数,它作用是按单词进行分组,然后统计各个分组中单词的个数,最后返回包含word和count列的数据框。
def wordCount(wordListDF): return wordListDF.groupBy('word').count()
先来看一个使用wordCount函数的例子:
wordsDF = (sqlContext .createDataFrame([('cat',), ('elephant',), ('rat',), ('rat',), ('cat', )], ['word']))wordCount(wordsDF).show()wordCount(words)
wordCount函数返回的数据框输出如下:
+--------+-----+| word|count|+--------+-----+| cat| 2|| rat| 2||elephant| 1|+--------+-----+
接下来使用wordCount函数统计《莎士比亚全集》的单词数,然后按照count列降序排列。
from pyspark.sql.functions import desctopWordsAndCountsDF = wordCount(shakeWordsDF).orderBy(desc('count'))topWordsAndCountsDF.show()
排序后的数据框输出如下所示
+----+-----+|word|count|+----+-----+| the|27361|| and|26028|| i|20681|| to|19150|| of|17463|| a|14593|| you|13615|| my|12481|| in|10956||that|10890|| is| 9134|| not| 8497||with| 7771|| me| 7769|| it| 7678|| for| 7558|| be| 6857|| his| 6857||your| 6655||this| 6602|+----+-----+
总结
可以看到,出现次数较多的单词大都是停用词。