
分布式分析和模式
|
95
for prefix, path in (('#', 'hashtags.txt'), ('@', 'handles.txt')):
with open(path, 'r') as f:
for word in f:
bloom.add(prefix + word.strip())
本示例创建了一个有
100
万元素、错误率为
0.1
的布隆过滤器。它在底层使用这些参数来
选择最优数 k(所需散列函数的数量),以保证给定容量下的错误阈值。性能和空间也存在
折中——容量越小且错误率越低,需要的散列函数就越多,计算也就越慢;容量越大,布
隆过滤器就必须越大。从磁盘文件读取标签和
Twitter
句柄(并给它们加上适当的前缀)
后,布隆过滤器将被写入磁盘上一个名为
twitter.bloom
的文件中。
在
Spark
中使用它:
ELEMS = re.compile(r'[#@][\w\d]+')
def tweet_filter(tweet, bloom=None):
for elem in ELEMS.findall(tweet['text']):
if elem in bloom.value:
return True
# 从磁盘加载布隆过滤器,进行并行化
bloom = sc.broadcast(BloomFilter.open('twitter.bloom'))
# 从磁盘加载JSON推文,进行解析
tweets = sc.textFile('tweets').map(json.loads) ...