博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[雪峰磁针石博客]pyspark工具机器学习(自然语言处理和推荐系统)2数据处理2 ...
阅读量:6236 次
发布时间:2019-06-22

本文共 5768 字,大约阅读时间需要 19 分钟。

图片.png

用户定义函数(UDF:User-Defined Functions)

UDF广泛用于数据处理,以转换数据帧。 PySpark中有两种类型的UDF:常规UDF和Pandas UDF。 Pandas UDF在速度和处理时间方面更加强大。

  • 传统的Python函数
>>> from pyspark.sql.functions import udf>>> def price_range(brand):...     prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'}...     return prices.get('test',"Low Price")... >>> brand_udf=udf(price_range,StringType())>>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)+-------+---+----------+------+-------+-----------+                             |ratings|age|experience|family|mobile |price_range|+-------+---+----------+------+-------+-----------+|3      |32 |9.0       |3     |Vivo   |Low Price  ||3      |27 |13.0      |3     |Apple  |Low Price  ||4      |22 |2.5       |0     |Samsung|Low Price  ||4      |37 |16.5      |4     |Apple  |Low Price  ||5      |27 |9.0       |1     |MI     |Low Price  ||4      |27 |9.0       |0     |Oppo   |Low Price  ||5      |37 |23.0      |5     |Vivo   |Low Price  ||5      |37 |23.0      |5     |Samsung|Low Price  ||3      |22 |2.5       |0     |Apple  |Low Price  ||3      |27 |6.0       |0     |MI     |Low Price  |+-------+---+----------+------+-------+-----------+only showing top 10 rows>>>
  • Lambda函数
>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())>>> df.withColumn("age_group", age_udf(df.age)).show(10,False)+-------+---+----------+------+-------+---------+|ratings|age|experience|family|mobile |age_group|+-------+---+----------+------+-------+---------+|3      |32 |9.0       |3     |Vivo   |senior   ||3      |27 |13.0      |3     |Apple  |young    ||4      |22 |2.5       |0     |Samsung|young    ||4      |37 |16.5      |4     |Apple  |senior   ||5      |27 |9.0       |1     |MI     |young    ||4      |27 |9.0       |0     |Oppo   |young    ||5      |37 |23.0      |5     |Vivo   |senior   ||5      |37 |23.0      |5     |Samsung|senior   ||3      |22 |2.5       |0     |Apple  |young    ||3      |27 |6.0       |0     |MI     |young    |+-------+---+----------+------+-------+---------+only showing top 10 rows

图片.png

  • PandasUDF(矢量化UDF)

有两种类型的Pandas UDF:Scalar和GroupedMap。

Pandas UDF与使用基本UDf非常相似。我们必须首先从PySpark导入pandas_udf并将其应用于要转换的任何特定列。

>>> from pyspark.sql.functions import pandas_udf>>> def remaining_yrs(age):...     return (100-age)... >>> from pyspark.sql.types import IntegerType>>> length_udf = pandas_udf(remaining_yrs, IntegerType())>>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream  warnings.warn("pyarrow.open_stream is deprecated, please use "+-------+---+----------+------+-------+--------+                                |ratings|age|experience|family|mobile |yrs_left|+-------+---+----------+------+-------+--------+|3      |32 |9.0       |3     |Vivo   |68      ||3      |27 |13.0      |3     |Apple  |73      ||4      |22 |2.5       |0     |Samsung|78      ||4      |37 |16.5      |4     |Apple  |63      ||5      |27 |9.0       |1     |MI     |73      ||4      |27 |9.0       |0     |Oppo   |73      ||5      |37 |23.0      |5     |Vivo   |63      ||5      |37 |23.0      |5     |Samsung|63      ||3      |22 |2.5       |0     |Apple  |78      ||3      |27 |6.0       |0     |MI     |73      |+-------+---+----------+------+-------+--------+only showing top 10 rows
  • PandasUDF(多列)
>>> def prod(rating,exp):...     return rating*exp... >>> prod_udf = pandas_udf(prod, DoubleType())>>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False)/opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream  warnings.warn("pyarrow.open_stream is deprecated, please use "+-------+---+----------+------+-------+-------+|ratings|age|experience|family|mobile |product|+-------+---+----------+------+-------+-------+|3      |32 |9.0       |3     |Vivo   |27.0   ||3      |27 |13.0      |3     |Apple  |39.0   ||4      |22 |2.5       |0     |Samsung|10.0   ||4      |37 |16.5      |4     |Apple  |66.0   ||5      |27 |9.0       |1     |MI     |45.0   ||4      |27 |9.0       |0     |Oppo   |36.0   ||5      |37 |23.0      |5     |Vivo   |115.0  ||5      |37 |23.0      |5     |Samsung|115.0  ||3      |22 |2.5       |0     |Apple  |7.5    ||3      |27 |6.0       |0     |MI     |18.0   |+-------+---+----------+------+-------+-------+only showing top 10 rows

删除重复值

>>> df.count()33>>> df=df.dropDuplicates()>>> df.count()26

删除列

>>> df_new=df.drop('mobile')>>> df_new.show()+-------+---+----------+------+|ratings|age|experience|family|+-------+---+----------+------+|      3| 32|       9.0|     3||      4| 22|       2.5|     0||      5| 27|       6.0|     0||      4| 22|       6.0|     1||      3| 27|       6.0|     0||      2| 32|      16.5|     2||      4| 27|       9.0|     0||      2| 27|       9.0|     2||      3| 37|      16.5|     5||      4| 27|       6.0|     1||      5| 37|      23.0|     5||      2| 27|       6.0|     2||      4| 37|       6.0|     0||      5| 37|      23.0|     5||      4| 37|       9.0|     2||      5| 37|      13.0|     1||      5| 27|       2.5|     0||      3| 42|      23.0|     5||      5| 22|       2.5|     0||      1| 37|      23.0|     5|+-------+---+----------+------+only showing top 20 rows

参考资料

  • 谢谢点赞!
  • [本文相关海量书籍下载](

写数据

  • CSV

如果我们想以原始csv格式将其保存为单个文件,我们可以在spark中使用coalesce函数。

>>> write_uri = '/home/andrew/test.csv'>>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
  • Parquet

如果数据集很大且涉及很多列,我们可以选择对其进行压缩并将其转换为Parquet文件格式。它减少了数据的整体大小并在处理数据时优化了性能,因为它可以处理所需列的子集而不是整个数据。

我们可以轻松地将数据帧转换并保存为Parquet格式。

注意完整的数据集以及代码可以在本书的GitHub存储库中进行参考,并在onSpark 2.3及更高版本上执行最佳。

图片.png

转载地址:http://wfkia.baihongyu.com/

你可能感兴趣的文章
BLE Hacking:使用Ubertooth one扫描嗅探低功耗蓝牙
查看>>
JAVA入门[3]—Spring依赖注入
查看>>
开黑吗?VRstudio推出八人系统的VR线下竞技场
查看>>
备份和导入Outlook 2016 电子邮件签名
查看>>
自建企业网盘异军突起,“私人定制”优势面面观
查看>>
HttpUrlConnection发送url请求(后台springmvc)
查看>>
Win8.1 远程桌面 凭据无法工作
查看>>
如何HACK无线家用警报器?
查看>>
云栖科技评论第24期:美国军方拟与IBM合作建专有云
查看>>
Hadoop2.7实战v1.0之JVM参数调优
查看>>
100多个经典常用的jQuery插件大全实例演示和下载
查看>>
linux中top命令详解
查看>>
cgi fastcgi php-cgi php-fpm
查看>>
memcache与memcached的区别与安装
查看>>
第三天 入口文件index.php 02
查看>>
tomcat 日志log4j,slf4j,logback冲突
查看>>
xml学习笔记(第二篇DTD)
查看>>
数据类型、字符串、list操作、集合set
查看>>
EIGRP 查看邻居命令详解
查看>>
Windows Vista下的EFS加密
查看>>