pyspark 日常整理
1 联表
df1.join(df2,连接条件,连接方式)
如:df1.join(df2,[df1.a==df2.a], "inner").show()
连接方式:字符串类型, 如 "left" , 常用的有:inner, cross, outer, full, full_outer, left, left_outer, right, right_outer
连接条件: df1["a"] == df2["a"] 或 "a" 或 df1.a == df2.a , 如有多个条件的情况 如,[df1["a"] == df2["a"] ,df1["b"] == df2["b"] ] 或 (df.a > 1) & (df.b > 1)
需要注意的:
如果使用 "a" 进行连接,则会自动合并相同字段,只输入一个。如 df1.join(df2,"a","left") 只输出df1的 a字段,df2 的 a 字段是去掉了。
2 udf使用
需添加引用
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
有两种方式:
第一种
def get_tablename(a):
return "name"
get_tablename_udf = F.udf(get_tablename)
第二种
@udf
def get_tablename_udf (a):
return "name"
两种方式的调用是一样的
df.withColumn("tablename", get_tablename_udf (df[a"]))
3 分组
使用groupBy方法
单个字段:df.groupBy("a") 或 df.groupBy(df.a)
多个字段:df.groupBy([df.a, df.b]) 或 df.groupBy(["a", "b"])
需要注意的:
groupBy方法后面 一定要跟字段输出方法,如:agg()、select()等
4 查询条件
使用 filter() 或 where() ,两者一样的。
单条件: df.filter(df.a > 1) 或 df.filter("a > 1")
多条件:df.filter("a > 1 and b > 0 ") 或 df.filter((df.a > 1) & (df.b ==0))
5 替换null值
使用 fillna() 或 fill()方法
df.fillna({"a":0, "b":""})
df.na.fill({"a":0, "b":""})
6 排序
使用 orderBy() 或 sort()方法
df.orderBy(df.a.desc())
df.orderBy(desc("age"), asc("name"))
df.orderBy(["age", "name"], ascending=[0, 1])
df.orderBy(["age", "name"], ascending=False)
需要注意的:
ascending 默认为True 升序, False 降序
7 新增列
使用 withColumn() 或 alias()方法
df.withColumn("b",F.lit(999))
df.withColumn("b",df.a)
df.withColumn("b",df.a).withColumn("m","m1")
df.agg(F.lit(ggg).alias("b"))
df.select(F.lit(ggg).alias("b"))
需要注意的:
withColumn方法会覆盖df里面原有的同名的列
8 重命名列名
使用 withColumnRenamed() 方法
df.withColumnRenamed("a","a1").withColumnRenamed("m","m1")
需要注意的点:
确定要重命名的列在df里面存在
9 创建新的DataFrame
使用createDataFrame()方法
spark.createDataFrame(数据集, 列集合) 例如:spark.createDataFrame([(5, "hello")], ['a', 'b'])
需要注意的:
数据集和列集合 个数要一致
spark为 SparkSession 对象, 例如:spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
10 并集
使用union() 或 unionAll() 方法
df.union(df1)
需要注意的:
这两个方法都不会主动消除重复项的,如需要,在后面跟distinct() 如:df.union(df1).distinct()
这两个方法都是按照数据列的摆放顺序进行合并,而不是根据列名
两个结果集的列 数量要保证一样大小
11 交集
使用 intersect()方法
df1.select("a").intersect(df2.select("a"))
返回 df1和df2 中 相同的a 字段
12 差集
使用 subtract()方法
df1.select("a").subtract(df2.select("a"))
返回 df1 有,而df2 没有的 a 字段值。
需要注意的:
取的是df1的数据
13 判断是否NULL值
使用isNull()方法 或 sql语句
df.where(df["a"].isNull())
df.where("a is null")
14 在计算条件中加入判断
使用when() 方法
df.select(when(df.age == 2, 1).alias("age"))
age列的值:当满足when条件,则输出1 ,否则,输出NULL
多个条件 :when((df.age == 2) & (df.name == '"name") , 1)
最新文章
- UVALive 4864 Bit Counting --记忆化搜索 / 数位DP?
- 快速创建 IEqualityComparer 实例:改进
- CodeIgniter2.2.0-在控制器里调用load失败报错的问题
- 简单封装cookie操作
- 《转载》三年建站之路走得一事无成 今来A5撞墙反思
- CSS样式补充代码
- EL&;struts2标签 读取map,list集合
- 使用MySQL中的EXPLAIN解释命令来检查SQL
- web页面相关的一些常见可用字符介绍
- Swiper单页网站简单案例(全屏网页)
- 对redux的粗略理解
- python 练习2
- python的pymysql使用方法【转】
- FortiGate防火墙HA下联堆叠交换机
- 【被C折腾系列】用C调DIOCP编码客户端通信
- 腾讯云ubuntu搭建jdk
- Qt实现探测当前有没有网络连接(Wi-Fi)——QNetworkConfigurationManager.isOnline()
- Codeforces Round #323 (Div. 2) E - Superior Periodic Subarrays
- lua的注释
- Excel开发之旅(三)——添加侧边工具栏