过滤 Spark 数据集的四种方法
在实际工作中,根据某个字段,对一个 Spark 数据集进行过滤,是一个很常见的场景,举个例子:
一个存储公司员工信息的数据集 A,有以下三个字段:
现在要过滤出某些员工的 id,这些 id 在 B 集合(B 可能是哈希表,也可能是 Spark 数据集)中,过滤逻辑为:
有四种方法可以实现,分别为:
Filter
Map
MapPartition
Inner Join
下面详细介绍。
Filter
Spark 的 Filter 变换,可以根据条件表达式、返回布尔值的过滤函数、条件字符串,对数据集进行过滤,使用方法如下:
Filter 变换比较简单,逐条处理记录不论数据集大小,效率都很高,但需要能够将用来过滤的数据集 B 广播到所有的 executor 上。
Map
Map 变换,对数据集中每条记录调用一个函数,返回值可以是 null,也可以是相同类型或不同类型的新记录,使用方法如下:
通过 Map 变换实现过滤的话,只需要在 Map 变换中,将符合条件的记录原样返回,不符合条件的记录返回 null 即可。
可以看到,Map 变换的语义和 Filter 变换的语义相似,都是逐条处理记录,但 Map 需要提供一个额外的 Encoder,故没有 Filter 简单和优雅,且因为输出要过滤 null 值,所以效率不如 Filter。
MapPartitions
MapPartitions 变换,与 Map 变换类似,但映射函数不是在每条记录上调用,而是在分区级别调用,使用方法如下:
MapPartitions 在分区级别进行操作,而不是记录级别,因此比 Filter 和 Map 效率更高。缺点的话,首先和 Map 一样,需要提供一个额外的 Encoder,此外,当分区过大,超过 executor 所能提供的内存时,任务会失败,因此可靠性不如 Map 和 Filter。
Inner Join
以员工 id 相等为 Inner Join 的条件,然后只要 A 集合中的字段,同样可以实现过滤,使用方法:
Inner Join 和 Filter 一样,效率和可靠性都有保证,且对 B 集合的类型和大小都没有偏好。
总结
在本文描述的过滤场景中,综合考虑效率和可靠性,如果用来过滤的集合比较小,可以广播到所有的 executor 中,那么选择 Filter 变换为佳,如果 B 集合很大,则 Inner Join 更合适。
参考链接:
https://towardsdatascience.com/four-ways-to-filter-a-spark-dataset-against-a-collection-of-data-values-7d52625bcf20
https://spark.apache.org/docs/latest/api/java/index.html
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html
公众号:大数志
传递最新、最有价值的大数据技术干货和资讯。
版权声明: 本文为 InfoQ 作者【大数志】的原创文章。
原文链接:【http://xie.infoq.cn/article/d9ac03e4ee73b70e8e9fd9e08】。
本文遵守【CC BY-NC-SA】协议,转载请保留原文出处及本版权声明。
评论