
308
|
第
12
章
+---+
| id|
+---+
| 1|
+---+
通过设置
spark.sql.execution.arrow.maxRecordsPerBatch
,可以控制
pandas.DataFrame
的
大小。要注意,输入大小和输出大小不一定相同,这和大多数
Pandas UDF
有所不同。
协同分组中的所有数据都会加载到内存中,这意味着,如果存在数据倾斜或
者某些分组大到内存放不下,那么可能会遇到内存不足的问题。
Spark 3.0
还引入了协同分组映射的
Pandas UDF
。函数
applyInPandas()
接受两个有共同键
的
pandas.DataFrame
,并将函数用在每个协同分组上。返回的
pandas.DataFrame
再组合成
单个
DataFrame
。与
mapInPandas()
一样,返回的
pandas.DataFrame
的长度没有限制。以
下是一个示例。
df1 = spark.createDataFrame(
[(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))
def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> ...