Spark和Pandas结合使用.md
本篇文章主要讲解spark和pandas结合应用的一个例子,我之前在工作中总是单纯的使用spark,有时候会将DataFrame转换为临时表,然后使用Hive-sql处理,或者是写Udf做稍微复杂一些的处理。在前段时间接触到spark可以和pandas结合使用,还真是又涨了点知识。
举一个例子,一个DataFrame的size是[m,n],我想对其进行groupby操作,然后返对每个分组内的上下两个row进行一些操作,最后返回一个和和原DataFrame大小一致的新df,最初我想到的一个方案是对组内每两行先打一个相同的tag,然后再结合window进行操作,不过这种方案比较麻烦,不直观,而且对每两行之间做一些复杂的运算可能也不是很友好。或者另一个场景,一个df,第一列是id,第二列是不同的品牌,想要统计某个id下各个品牌的数量(可以有重复),这时一种常规的方案是对每个品牌进行映射到一个新的列,然后若这一行是品牌1,则打标记为1,其它为0,品牌2的列同理,这样处理起来略显麻烦,pandas就可以很简单的处理。
再举一个例子,我想对df分组,计算组内分位数,但是我要求分位数必须是组内出现的数字,而不是插值之后的数字,并且还要对一些异常值进行判断。你可能会想到使用hivesql中的percentile来做,但是它返回的未必是组内的值,而且对异常值判断不是很方便。
对于上面两个例子,单纯的使用spark和hivesql都不是最优雅的方案,这时候pandas就出现了,它可以很好地解决这两类问题。
一、环境介绍
简单介绍下我所应用的环境:首先是要安装pyarrow这个库,其次python版本不要高于3.7,最后spark环境,我之前使用的是2.4,一直报错,折腾了好久,后来发现是环境问题,切换到3.2就可以了。
1 |
|
二、Spark与Pandas结合
spark和pandas结合其实就是把一部分sparkdataframe,转换为pandas dataframe,然后可以比较方便的进行数值计算,就像写单机pandas操作一样。
我们需要用到 pyspark.sql.functions import pandas_udf, PandasUDFType,这两个函数,顾名思义,pandas_udf就是用户自定义的pandas函数
1 |
|
pandas_udf就三个参数:
f:即用户自定义的func
returnType:自定义的func返回的值类型
functionType:枚举值,包含下面四种方式,表示的是我们的函数是按照什么样的方式进行映射,即返回值和输入是怎样的映射关系:
SCALAR: PandasScalarUDFType :标量,即返回一个值 SCALAR_ITER: PandasScalarIterUDFType :迭代器,这个还需要我探索一下 GROUPED_MAP: PandasGroupedMapUDFType :分组映射,分组之后返回df,可以和原df大小一致,也可不一致,用户自己可以控制 GROUPED_AGG: PandasGroupedAggUDFType :分组聚合,分组之后返回一个常量值
今天主要介绍一下GROUPED_MAP和GROUP_AGG的用法。
方法 | 输入 | 输出 | 配合使用方式 | ||
---|---|---|---|---|---|
GROUPED_MAP | Datafram | Dataframe | Apply | ||
GROUPED_AGG | 一列或多列 | 常量 | agg | ||
1、GROUPED_AGG
就拿上面那个计算组内分位数的例子来做,
1 |
|
2、GROUPED_MAP
就拿上面计算每个id下不同品牌的数量的例子来看:
1 |
|
三、总结
好了,这就是本篇文章对pyspark和pandas一起应用的一个例子,分别介绍了GROUP_AGG和GROUP_MAP两个场景,后续还需要再补充一个SCALAR_ITER的应用,希望能对您有用。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!