
90
|
第
5
章
"""
Spark应用程序的主要分析过程
"""
# 从数据集加载停用词
with open('stopwords.txt', 'r') as words:
stopwords = frozenset([
word.strip() for word in words.read().split("\n")
])
# 将停用词广播到集群
stopwords = sc.broadcast(stopwords)
# 第一阶段: 分词并计算文档频率
# 请注意: 假设有一个包含(docid, text)对的语料库
docfreq = corpus.flatMap(partial(tokenize, stopwords=stopwords))
docfreq = docfreq.reduceByKey(add)
# 第二阶段: 计算词频,然后执行键空间更改
trmfreq = docfreq.map(lambda (key, tf): (key[1], (key[0], tf, 1)))
trmfreq = trmfreq.reduceByKey(term_frequency)
trmfreq = trmfreq.map(
lambda (word, (docid, tf, n)): ((word, docid), (tf, n))
)
# 第三阶段:为每个(word,document)对计算TF-IDF
tfidfs ...