本文共 5768 字,大约阅读时间需要 19 分钟。
UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。
>>> from pyspark.sql.functions import udf>>> def price_range(brand):... prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'}... return prices.get('test',"Low Price")... >>> brand_udf=udf(price_range,StringType())>>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)+-------+---+----------+------+-------+-----------+ |ratings|age|experience|family|mobile |price_range|+-------+---+----------+------+-------+-----------+|3 |32 |9.0 |3 |Vivo |Low Price ||3 |27 |13.0 |3 |Apple |Low Price ||4 |22 |2.5 |0 |Samsung|Low Price ||4 |37 |16.5 |4 |Apple |Low Price ||5 |27 |9.0 |1 |MI |Low Price ||4 |27 |9.0 |0 |Oppo |Low Price ||5 |37 |23.0 |5 |Vivo |Low Price ||5 |37 |23.0 |5 |Samsung|Low Price ||3 |22 |2.5 |0 |Apple |Low Price ||3 |27 |6.0 |0 |MI |Low Price |+-------+---+----------+------+-------+-----------+only showing top 10 rows>>>
>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())>>> df.withColumn("age_group", age_udf(df.age)).show(10,False)+-------+---+----------+------+-------+---------+|ratings|age|experience|family|mobile |age_group|+-------+---+----------+------+-------+---------+|3 |32 |9.0 |3 |Vivo |senior ||3 |27 |13.0 |3 |Apple |young ||4 |22 |2.5 |0 |Samsung|young ||4 |37 |16.5 |4 |Apple |senior ||5 |27 |9.0 |1 |MI |young ||4 |27 |9.0 |0 |Oppo |young ||5 |37 |23.0 |5 |Vivo |senior ||5 |37 |23.0 |5 |Samsung|senior ||3 |22 |2.5 |0 |Apple |young ||3 |27 |6.0 |0 |MI |young |+-------+---+----------+------+-------+---------+only showing top 10 rows
有两种类型的Pandas UDF:Scalar和GroupedMap。
Pandas UDF与使用基本UDf非常相似。我们必须首先从PySpark导入pandas_udf并将其应用于要转换的任何特定列。
>>> from pyspark.sql.functions import pandas_udf>>> def remaining_yrs(age):... return (100-age)... >>> from pyspark.sql.types import IntegerType>>> length_udf = pandas_udf(remaining_yrs, IntegerType())>>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use "+-------+---+----------+------+-------+--------+ |ratings|age|experience|family|mobile |yrs_left|+-------+---+----------+------+-------+--------+|3 |32 |9.0 |3 |Vivo |68 ||3 |27 |13.0 |3 |Apple |73 ||4 |22 |2.5 |0 |Samsung|78 ||4 |37 |16.5 |4 |Apple |63 ||5 |27 |9.0 |1 |MI |73 ||4 |27 |9.0 |0 |Oppo |73 ||5 |37 |23.0 |5 |Vivo |63 ||5 |37 |23.0 |5 |Samsung|63 ||3 |22 |2.5 |0 |Apple |78 ||3 |27 |6.0 |0 |MI |73 |+-------+---+----------+------+-------+--------+only showing top 10 rows
>>> def prod(rating,exp):... return rating*exp... >>> prod_udf = pandas_udf(prod, DoubleType())>>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False)/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use "+-------+---+----------+------+-------+-------+|ratings|age|experience|family|mobile |product|+-------+---+----------+------+-------+-------+|3 |32 |9.0 |3 |Vivo |27.0 ||3 |27 |13.0 |3 |Apple |39.0 ||4 |22 |2.5 |0 |Samsung|10.0 ||4 |37 |16.5 |4 |Apple |66.0 ||5 |27 |9.0 |1 |MI |45.0 ||4 |27 |9.0 |0 |Oppo |36.0 ||5 |37 |23.0 |5 |Vivo |115.0 ||5 |37 |23.0 |5 |Samsung|115.0 ||3 |22 |2.5 |0 |Apple |7.5 ||3 |27 |6.0 |0 |MI |18.0 |+-------+---+----------+------+-------+-------+only showing top 10 rows
>>> df.count()33>>> df=df.dropDuplicates()>>> df.count()26
>>> df_new=df.drop('mobile')>>> df_new.show()+-------+---+----------+------+|ratings|age|experience|family|+-------+---+----------+------+| 3| 32| 9.0| 3|| 4| 22| 2.5| 0|| 5| 27| 6.0| 0|| 4| 22| 6.0| 1|| 3| 27| 6.0| 0|| 2| 32| 16.5| 2|| 4| 27| 9.0| 0|| 2| 27| 9.0| 2|| 3| 37| 16.5| 5|| 4| 27| 6.0| 1|| 5| 37| 23.0| 5|| 2| 27| 6.0| 2|| 4| 37| 6.0| 0|| 5| 37| 23.0| 5|| 4| 37| 9.0| 2|| 5| 37| 13.0| 1|| 5| 27| 2.5| 0|| 3| 42| 23.0| 5|| 5| 22| 2.5| 0|| 1| 37| 23.0| 5|+-------+---+----------+------+only showing top 20 rows
如果我们想以原始csv格式将其保存为单个文件,我们可以在spark中使用coalesce函数。
>>> write_uri = '/home/andrew/test.csv'>>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
如果数据集很大且涉及很多列,我们可以选择对其进行压缩并将其转换为Parquet文件格式。它减少了数据的整体大小并在处理数据时优化了性能,因为它可以处理所需列的子集而不是整个数据。
我们可以轻松地将数据帧转换并保存为Parquet格式。注意完整的数据集以及代码可以在本书的GitHub存储库中进行参考,并在onSpark 2.3及更高版本上执行最佳。
转载地址:http://wfkia.baihongyu.com/