
Ray Core
入门
|
39
Ray
的基础设施时,我们将详细讨论节点之间和工作节点内部如何传递值。虽
然与对象存储的交互需要一些额外开销,但在处理更大、更真实的数据集时,
却可以提升性能。目前,这一步在真正的分布式环境中是必不可少的。如果你
愿意的话,可以用
retrieve_task
函数重新运行示例
2-1
,并确认它是否按预期
运行。
使用
Ray
的
wait
函数进行非阻塞调用
在示例
2-1
中,我们使用
ray.get(object_references)
访问结果。这个调用是
阻塞的,也就是说,驱动程序必须等待所有结果可用。在本示例中,这并不是
什么大问题,因为程序在
1s
内就能完成。但是,如果处理每个数据库都需要几
分钟的时间,就难以接受了。在这种情况下,我们希望释放驱动程序进程以便
处理其他任务,而不是闲着无所事事。另外,最好能够在结果到达时立即进行
处理(有些结果比其他结果更快完成),而不是等待所有项都被处理完。还有一
个需要注意的问题,如果数据库中的某个项无法按照预期检索出来,会发生什
么?假设数据库连接中出现了死锁,驱动程序将会被挂起,永远无法检索出所
有的项。因此,使用合理的超时时间是个好主意。我们假设停止任务之前等待
的时间不能超过最长数据检索任务的
10
倍。如下所示是使用
Ray
的
wait
函数
的实现方法:
start = time.time()
object_references = [
retrieve_task.remote(item, db_object_ref) for item in range(8) ...