0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

SparkSQL编程基本概念和基本用法

数据分析与开发 来源:算法美食屋 作者:梁云1991 2021-11-02 15:45 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

本节将介绍SparkSQL编程基本概念和基本用法。

不同于RDD编程的命令式编程范式,SparkSQL编程是一种声明式编程范式,我们可以通过SQL语句或者调用DataFrame的相关API描述我们想要实现的操作。

然后Spark会将我们的描述进行语法解析,找到相应的执行计划并对其进行流程优化,然后调用相应基础命令进行执行。

我们使用pyspark进行RDD编程时,在Excutor上跑的很多时候就是Python代码,当然,少数时候也会跑java字节码。

但我们使用pyspark进行SparkSQL编程时,在Excutor上跑的全部是java字节码,pyspark在Driver端就将相应的Python代码转换成了java任务然后放到Excutor上执行。

因此,使用SparkSQL的编程范式进行编程,我们能够取得几乎和直接使用scala/java进行编程相当的效率(忽略语法解析时间差异)。此外SparkSQL提供了非常方便的数据读写API,我们可以用它和Hive表,HDFS,mysql表,Cassandra,Hbase等各种存储媒介进行数据交换。

美中不足的是,SparkSQL的灵活性会稍差一些,其默认支持的数据类型通常只有 Int,Long,Float,Double,String,Boolean 等这些标准SQL数据类型, 类型扩展相对繁琐。对于一些较为SQL中不直接支持的功能,通常可以借助于用户自定义函数(UDF)来实现,如果功能更加复杂,则可以转成RDD来进行实现。

本节我们将主要介绍以下主要内容:

  • RDD和DataFrame的对比

  • 创建DataFrame

  • DataFrame保存成文件

  • DataFrame的API交互

  • DataFrame的SQL交互

importfindspark

#指定spark_home为刚才的解压路径,指定python路径
spark_home="/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path="/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)

importpyspark
frompyspark.sqlimportSparkSession

#SparkSQL的许多功能封装在SparkSession的方法接口

spark=SparkSession.builder
.appName("test")
.config("master","local[4]")
.enableHiveSupport()
.getOrCreate()

sc=spark.sparkContext


一,RDD,DataFrame和DataSet对比

DataFrame参照了Pandas的思想,在RDD基础上增加了schma,能够获取列名信息。

DataSet在DataFrame基础上进一步增加了数据类型信息,可以在编译时发现类型错误。

DataFrame可以看成DataSet[Row],两者的API接口完全相同。

DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。

DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。

DataFrame数据结构本质上是通过RDD来实现的,但是RDD是一种行存储的数据结构,而DataFrame是一种列存储的数据结构。

二,创建DataFrame

1,通过toDF方法转换成DataFrame

可以将RDD用toDF方法转换成DataFrame

#将RDD转换成DataFrame
rdd=sc.parallelize([("LiLei",15,88),("HanMeiMei",16,90),("DaChui",17,60)])
df=rdd.toDF(["name","age","score"])
df.show()
df.printSchema()
+---------+---+-----+
|name|age|score|
+---------+---+-----+
|LiLei|15|88|
|HanMeiMei|16|90|
|DaChui|17|60|
+---------+---+-----+

root
|--name:string(nullable=true)
|--age:long(nullable=true)
|--score:long(nullable=true)

2, 通过createDataFrame方法将Pandas.DataFrame转换成pyspark中的DataFrame

importpandasaspd

pdf=pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns=["name","age"])
df=spark.createDataFrame(pdf)
df.show()
+---------+---+
|name|age|
+---------+---+
|LiLei|18|
|HanMeiMei|17|
+---------+---+
#也可以对列表直接转换
values=[("LiLei",18),("HanMeiMei",17)]
df=spark.createDataFrame(values,["name","age"])
df.show()
+---------+---+
|name|age|
+---------+---+
|LiLei|18|
|HanMeiMei|17|
+---------+---+

4, 通过createDataFrame方法指定schema动态创建DataFrame

可以通过createDataFrame的方法指定rdd和schema创建DataFrame。

这种方法比较繁琐,但是可以在预先不知道schema和数据类型的情况下在代码中动态创建DataFrame.

frompyspark.sql.typesimport*
frompyspark.sqlimportRow
fromdatetimeimportdatetime

schema=StructType([StructField("name",StringType(),nullable=False),
StructField("score",IntegerType(),nullable=True),
StructField("birthday",DateType(),nullable=True)])

rdd=sc.parallelize([Row("LiLei",87,datetime(2010,1,5)),
Row("HanMeiMei",90,datetime(2009,3,1)),
Row("DaChui",None,datetime(2008,7,2))])

dfstudent=spark.createDataFrame(rdd,schema)

dfstudent.show()
+---------+-----+----------+
|name|score|birthday|
+---------+-----+----------+
|LiLei|87|2010-01-05|
|HanMeiMei|90|2009-03-01|
|DaChui|null|2008-07-02|
+---------+-----+----------+

4,通过读取文件创建

可以读取json文件,csv文件,hive数据表或者mysql数据表得到DataFrame。

#读取json文件生成DataFrame
df=spark.read.json("data/people.json")
df.show()
+----+-------+
|age|name|
+----+-------+
|null|Michael|
|30|Andy|
|19|Justin|
+----+-------+
#读取csv文件
df=spark.read.option("header","true")
.option("inferSchema","true")
.option("delimiter",",")
.csv("data/iris.csv")
df.show(5)
df.printSchema()
+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|5.1|3.5|1.4|0.2|0|
|4.9|3.0|1.4|0.2|0|
|4.7|3.2|1.3|0.2|0|
|4.6|3.1|1.5|0.2|0|
|5.0|3.6|1.4|0.2|0|
+-----------+----------+-----------+----------+-----+
onlyshowingtop5rows

root
|--sepallength:double(nullable=true)
|--sepalwidth:double(nullable=true)
|--petallength:double(nullable=true)
|--petalwidth:double(nullable=true)
|--label:integer(nullable=true)
#读取csv文件
df=spark.read.format("com.databricks.spark.csv")
.option("header","true")
.option("inferSchema","true")
.option("delimiter",",")
.load("data/iris.csv")
df.show(5)
df.printSchema()
+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|5.1|3.5|1.4|0.2|0|
|4.9|3.0|1.4|0.2|0|
|4.7|3.2|1.3|0.2|0|
|4.6|3.1|1.5|0.2|0|
|5.0|3.6|1.4|0.2|0|
+-----------+----------+-----------+----------+-----+
onlyshowingtop5rows

root
|--sepallength:double(nullable=true)
|--sepalwidth:double(nullable=true)
|--petallength:double(nullable=true)
|--petalwidth:double(nullable=true)
|--label:integer(nullable=true)
#读取parquet文件
df=spark.read.parquet("data/users.parquet")
df.show()
+------+--------------+----------------+
|name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|null|[3,9,15,20]|
|Ben|red|[]|
+------+--------------+----------------+

#读取hive数据表生成DataFrame

spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")
spark.sql("LOADDATALOCALINPATH'data/kv1.txt'INTOTABLEsrc")
df=spark.sql("SELECTkey,valueFROMsrcWHEREkey< 10 ORDER BY key")
df.show(5)

+---+-----+
|key|value|
+---+-----+
|0|val_0|
|0|val_0|
|0|val_0|
|0|val_0|
|0|val_0|
+---+-----+
onlyshowingtop5rows
#读取mysql数据表生成DataFrame
"""
url="jdbc//localhost:3306/test"
df=spark.read.format("jdbc")
.option("url",url)
.option("dbtable","runoob_tbl")
.option("user","root")
.option("password","0845")
.load()
df.show()
"""

三,DataFrame保存成文件

可以保存成csv文件,json文件,parquet文件或者保存成hive数据表

#保存成csv文件
df=spark.read.format("json").load("data/people.json")
df.write.format("csv").option("header","true").save("data/people_write.csv")
#先转换成rdd再保存成txt文件
df.rdd.saveAsTextFile("data/people_rdd.txt")
#保存成json文件
df.write.json("data/people_write.json")
#保存成parquet文件,压缩格式,占用存储小,且是spark内存中存储格式,加载最快
df.write.partitionBy("age").format("parquet").save("data/namesAndAges.parquet")
df.write.parquet("data/people_write.parquet")
#保存成hive数据表
df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")

四,DataFrame的API交互

frompyspark.sqlimportRow
frompyspark.sql.functionsimport*

df=spark.createDataFrame(
[("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male")]).toDF("name","age","gender")

df.show()
df.printSchema()

+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
+---------+---+------+

root
|--name:string(nullable=true)
|--age:long(nullable=true)
|--gender:string(nullable=true)

1,Action操作

DataFrame的Action操作包括show,count,collect,,describe,take,head,first等操作。

#show
df.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
+---------+---+------+
#show(numRows:Int,truncate:Boolean)
#第二个参数设置是否当输出字段长度超过20时进行截取
df.show(2,False)
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
+---------+---+------+
onlyshowingtop2rows
#count
df.count()
3
#collect
df.collect()
[Row(name='LiLei',age=15,gender='male'),
Row(name='HanMeiMei',age=16,gender='female'),
Row(name='DaChui',age=17,gender='male')]
#first
df.first()
Row(name='LiLei',age=15,gender='male')
#take
df.take(2)
[Row(name='LiLei',age=15,gender='male'),
Row(name='HanMeiMei',age=16,gender='female')]
#head
df.head(2)
[Row(name='LiLei',age=15,gender='male'),
Row(name='HanMeiMei',age=16,gender='female')]

2,类RDD操作

DataFrame支持RDD中一些诸如distinct,cache,sample,foreach,intersect,except等操作。

可以把DataFrame当做数据类型为Row的RDD来进行操作,必要时可以将其转换成RDD来操作。

df=spark.createDataFrame([("HelloWorld",),("HelloChina",),("HelloSpark",)]).toDF("value")
df.show()
+-----------+
|value|
+-----------+
|HelloWorld|
|HelloChina|
|HelloSpark|
+-----------+
#map操作,需要先转换成rdd
rdd=df.rdd.map(lambdax:Row(x[0].upper()))
dfmap=rdd.toDF(["value"]).show()
+-----------+
|value|
+-----------+
|HELLOWORLD|
|HELLOCHINA|
|HELLOSPARK|
+-----------+
#flatMap,需要先转换成rdd
df_flat=df.rdd.flatMap(lambdax:x[0].split("")).map(lambdax:Row(x)).toDF(["value"])
df_flat.show()
+-----+
|value|
+-----+
|Hello|
|World|
|Hello|
|China|
|Hello|
|Spark|
+-----+
#filter过滤
df_filter=df.rdd.filter(lambdas:s[0].endswith("Spark")).toDF(["value"])

df_filter.show()
+-----------+
|value|
+-----------+
|HelloSpark|
+-----------+
#filter和broadcast混合使用
broads=sc.broadcast(["Hello","World"])

df_filter_broad=df_flat.filter(~col("value").isin(broads.value))

df_filter_broad.show()
+-----+
|value|
+-----+
|China|
|Spark|
+-----+
#distinct
df_distinct=df_flat.distinct()
df_distinct.show()

+-----+
|value|
+-----+
|World|
|China|
|Hello|
|Spark|
+-----+
#cache缓存
df.cache()
df.unpersist()
#sample抽样
dfsample=df.sample(False,0.6,0)

dfsample.show()
+-----------+
|value|
+-----------+
|HelloChina|
|HelloSpark|
+-----------+
df2=spark.createDataFrame([["HelloWorld"],["HelloScala"],["HelloSpark"]]).toDF("value")
df2.show()
+-----------+
|value|
+-----------+
|HelloWorld|
|HelloScala|
|HelloSpark|
+-----------+
#intersect交集
dfintersect=df.intersect(df2)

dfintersect.show()
+-----------+
|value|
+-----------+
|HelloSpark|
|HelloWorld|
+-----------+
#exceptAll补集

dfexcept=df.exceptAll(df2)
dfexcept.show()

+-----------+
|value|
+-----------+
|HelloChina|
+-----------+

3,类Excel操作

可以对DataFrame进行增加列,删除列,重命名列,排序等操作,去除重复行,去除空行,就跟操作Excel表格一样。

df=spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)
]).toDF("name","age","gender")

df.show()
df.printSchema()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+

root
|--name:string(nullable=true)
|--age:long(nullable=true)
|--gender:string(nullable=true)
#增加列
dfnew=df.withColumn("birthyear",-df["age"]+2020)

dfnew.show()
+---------+---+------+---------+
|name|age|gender|birthyear|
+---------+---+------+---------+
|LiLei|15|male|2005|
|HanMeiMei|16|female|2004|
|DaChui|17|male|2003|
|RuHua|16|null|2004|
+---------+---+------+---------+
#置换列的顺序
dfupdate=dfnew.select("name","age","birthyear","gender")
dfupdate.show()
#删除列
dfdrop=df.drop("gender")
dfdrop.show()
+---------+---+
|name|age|
+---------+---+
|LiLei|15|
|HanMeiMei|16|
|DaChui|17|
|RuHua|16|
+---------+---+
#重命名列
dfrename=df.withColumnRenamed("gender","sex")
dfrename.show()
+---------+---+------+
|name|age|sex|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+

#排序sort,可以指定升序降序
dfsorted=df.sort(df["age"].desc())
dfsorted.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|DaChui|17|male|
|RuHua|16|null|
|HanMeiMei|16|female|
|LiLei|15|male|
+---------+---+------+
#排序orderby,默认为升序,可以根据多个字段
dfordered=df.orderBy(df["age"].desc(),df["gender"].desc())

dfordered.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|DaChui|17|male|
|HanMeiMei|16|female|
|RuHua|16|null|
|LiLei|15|male|
+---------+---+------+
#去除nan值行
dfnotnan=df.na.drop()

dfnotnan.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
+---------+---+------+
#填充nan值
df_fill=df.na.fill("female")
df_fill.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|female|
+---------+---+------+
#替换某些值
df_replace=df.na.replace({"":"female","RuHua":"SiYu"})
df_replace.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|SiYu|16|null|
+---------+---+------+
#去重,默认根据全部字段
df2=df.unionAll(df)
df2.show()
dfunique=df2.dropDuplicates()
dfunique.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+

+---------+---+------+
|name|age|gender|
+---------+---+------+
|RuHua|16|null|
|DaChui|17|male|
|HanMeiMei|16|female|
|LiLei|15|male|
+---------+---+------+
#去重,根据部分字段
dfunique_part=df.dropDuplicates(["age"])
dfunique_part.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|DaChui|17|male|
|LiLei|15|male|
|HanMeiMei|16|female|
+---------+---+------+

#简单聚合操作
dfagg=df.agg({"name":"count","age":"max"})

dfagg.show()
+-----------+--------+
|count(name)|max(age)|
+-----------+--------+
|4|17|
+-----------+--------+

#汇总信息
df_desc=df.describe()
df_desc.show()
+-------+------+-----------------+------+
|summary|name|age|gender|
+-------+------+-----------------+------+
|count|4|4|3|
|mean|null|16.0|null|
|stddev|null|0.816496580927726|null|
|min|DaChui|15|female|
|max|RuHua|17|male|
+-------+------+-----------------+------+
#频率超过0.5的年龄和性别
df_freq=df.stat.freqItems(("age","gender"),0.5)

df_freq.show()
+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
|[16]|[male]|
+-------------+----------------+

		

4,类SQL表操作

类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。

df=spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)]).toDF("name","age","gender")

df.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
+---------+---+------+
#表查询select
dftest=df.select("name").limit(2)
dftest.show()
+---------+
|name|
+---------+
|LiLei|
|HanMeiMei|
+---------+
dftest=df.select("name",df["age"]+1)
dftest.show()
+---------+---------+
|name|(age+1)|
+---------+---------+
|LiLei|16|
|HanMeiMei|17|
|DaChui|18|
|RuHua|17|
+---------+---------+
#表查询select
dftest=df.select("name",-df["age"]+2020).toDF("name","birth_year")
dftest.show()
+---------+----------+
|name|birth_year|
+---------+----------+
|LiLei|2005|
|HanMeiMei|2004|
|DaChui|2003|
|RuHua|2004|
+---------+----------+
#表查询selectExpr,可以使用UDF函数,指定别名等
importdatetime
spark.udf.register("getBirthYear",lambdaage:datetime.datetime.now().year-age)
dftest=df.selectExpr("name","getBirthYear(age)asbirth_year","UPPER(gender)asgender")
dftest.show()
+---------+----------+------+
|name|birth_year|gender|
+---------+----------+------+
|LiLei|2005|MALE|
|HanMeiMei|2004|FEMALE|
|DaChui|2003|MALE|
|RuHua|2004|null|
+---------+----------+------+
#表查询where,指定SQL中的where字句表达式
dftest=df.where("gender='male'andage>15")
dftest.show()
+------+---+------+
|name|age|gender|
+------+---+------+
|DaChui|17|male|
+------+---+------+
#表查询filter
dftest=df.filter(df["age"]>16)
dftest.show()
+------+---+------+
|name|age|gender|
+------+---+------+
|DaChui|17|male|
+------+---+------+
#表查询filter
dftest=df.filter("gender='male'")
dftest.show()
+------+---+------+
|name|age|gender|
+------+---+------+
|LiLei|15|male|
|DaChui|17|male|
+------+---+------+
#表连接join
dfscore=spark.createDataFrame([("LiLei","male",88),("HanMeiMei","female",90),("DaChui","male",50)])
.toDF("name","gender","score")

dfscore.show()
+---------+------+-----+
|name|gender|score|
+---------+------+-----+
|LiLei|male|88|
|HanMeiMei|female|90|
|DaChui|male|50|
+---------+------+-----+
#表连接join,根据单个字段
dfjoin=df.join(dfscore.select("name","score"),"name")
dfjoin.show()
+---------+---+------+-----+
|name|age|gender|score|
+---------+---+------+-----+
|LiLei|15|male|88|
|HanMeiMei|16|female|90|
|DaChui|17|male|50|
+---------+---+------+-----+
#表连接join,根据多个字段
dfjoin=df.join(dfscore,["name","gender"])
dfjoin.show()
+---------+------+---+-----+
|name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female|16|90|
|DaChui|male|17|50|
|LiLei|male|15|88|
+---------+------+---+-----+
#表连接join,根据多个字段
#可以指定连接方式为"inner","left","right","outer","semi","full","leftanti","anti"等多种方式
dfjoin=df.join(dfscore,["name","gender"],"right")
dfjoin.show()
+---------+------+---+-----+
|name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female|16|90|
|DaChui|male|17|50|
|LiLei|male|15|88|
+---------+------+---+-----+

dfjoin=df.join(dfscore,["name","gender"],"outer")
dfjoin.show()
+---------+------+---+-----+
|name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female|16|90|
|DaChui|male|17|50|
|LiLei|male|15|88|
|RuHua|null|16|null|
+---------+------+---+-----+
#表连接,灵活指定连接关系
dfmark=dfscore.withColumnRenamed("gender","sex")
dfmark.show()
+---------+------+-----+
|name|sex|score|
+---------+------+-----+
|LiLei|male|88|
|HanMeiMei|female|90|
|DaChui|male|50|
+---------+------+-----+

dfjoin=df.join(dfmark,(df["name"]==dfmark["name"])&(df["gender"]==dfmark["sex"]),
"inner")
dfjoin.show()
+---------+---+------+---------+------+-----+
|name|age|gender|name|sex|score|
+---------+---+------+---------+------+-----+
|HanMeiMei|16|female|HanMeiMei|female|90|
|DaChui|17|male|DaChui|male|50|
|LiLei|15|male|LiLei|male|88|
+---------+---+------+---------+------+-----+

#表合并union
dfstudent=spark.createDataFrame([("Jim",18,"male"),("Lily",16,"female")]).toDF("name","age","gender")
dfstudent.show()
+----+---+------+
|name|age|gender|
+----+---+------+
|Jim|18|male|
|Lily|16|female|
+----+---+------+
dfunion=df.union(dfstudent)
dfunion.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|15|male|
|HanMeiMei|16|female|
|DaChui|17|male|
|RuHua|16|null|
|Jim|18|male|
|Lily|16|female|
+---------+---+------+
#表分组groupBy
frompyspark.sqlimportfunctionsasF
dfgroup=df.groupBy("gender").max("age")
dfgroup.show()
+------+--------+
|gender|max(age)|
+------+--------+
|null|16|
|female|16|
|male|17|
+------+--------+
#表分组后聚合,groupBy,agg
dfagg=df.groupBy("gender").agg(F.mean("age").alias("mean_age"),
F.collect_list("name").alias("names"))
dfagg.show()
+------+--------+---------------+
|gender|mean_age|names|
+------+--------+---------------+
|null|16.0|[RuHua]|
|female|16.0|[HanMeiMei]|
|male|16.0|[LiLei,DaChui]|
+------+--------+---------------+

#表分组聚合,groupBy,agg
dfagg=df.groupBy("gender").agg(F.expr("avg(age)"),F.expr("collect_list(name)"))
dfagg.show()

+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
|null|16.0|[RuHua]|
|female|16.0|[HanMeiMei]|
|male|16.0|[LiLei,DaChui]|
+------+--------+------------------+

#表分组聚合,groupBy,agg
df.groupBy("gender","age").agg(F.collect_list(col("name"))).show()
+------+---+------------------+
|gender|age|collect_list(name)|
+------+---+------------------+
|male|15|[LiLei]|
|male|17|[DaChui]|
|female|16|[HanMeiMei]|
|null|16|[RuHua]|
+------+---+------------------+

#表分组后透视,groupBy,pivot
dfstudent=spark.createDataFrame([("LiLei",18,"male",1),("HanMeiMei",16,"female",1),
("Jim",17,"male",2),("DaChui",20,"male",2)]).toDF("name","age","gender","class")
dfstudent.show()
dfstudent.groupBy("class").pivot("gender").max("age").show()
+---------+---+------+-----+
|name|age|gender|class|
+---------+---+------+-----+
|LiLei|18|male|1|
|HanMeiMei|16|female|1|
|Jim|17|male|2|
|DaChui|20|male|2|
+---------+---+------+-----+

+-----+------+----+
|class|female|male|
+-----+------+----+
|1|16|18|
|2|null|20|
+-----+------+----+
#窗口函数

df=spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"),
("DaChui",65,"class2"),("RuHua",55,"class2")])
.toDF("name","score","class")

df.show()
dforder=df.selectExpr("name","score","class",
"row_number()over(partitionbyclassorderbyscoredesc)asorder")

dforder.show()
+---------+-----+------+
|name|score|class|
+---------+-----+------+
|LiLei|78|class1|
|HanMeiMei|87|class1|
|DaChui|65|class2|
|RuHua|55|class2|
+---------+-----+------+

+---------+-----+------+-----+
|name|score|class|order|
+---------+-----+------+-----+
|DaChui|65|class2|1|
|RuHua|55|class2|2|
|HanMeiMei|87|class1|1|
|LiLei|78|class1|2|
+---------+-----+------+-----+

		

六,DataFrame的SQL交互

将DataFrame注册为临时表视图或者全局表视图后,可以使用sql语句对DataFrame进行交互。

不仅如此,还可以通过SparkSQL对Hive表直接进行增删改查等操作。

1,注册视图后进行SQL交互

#注册为临时表视图,其生命周期和SparkSession相关联
df=spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")],
("name","age","gender"))

df.show()
df.createOrReplaceTempView("student")
dfmale=spark.sql("select*fromstudentwheregender='male'")
dfmale.show()
+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|18|male|
|HanMeiMei|17|female|
|Jim|16|male|
+---------+---+------+

+-----+---+------+
|name|age|gender|
+-----+---+------+
|LiLei|18|male|
|Jim|16|male|
+-----+---+------+
#注册为全局临时表视图,其生命周期和整个Spark应用程序关联

df.createOrReplaceGlobalTempView("student")
query="""
selectt.gender
,collect_list(t.name)asnames
fromglobal_temp.studentt
groupbyt.gender
""".strip("
")

spark.sql(query).show()
#可以在新的Session中访问
spark.newSession().sql("select*fromglobal_temp.student").show()

+------+------------+
|gender|names|
+------+------------+
|female|[HanMeiMei]|
|male|[LiLei,Jim]|
+------+------------+

+---------+---+------+
|name|age|gender|
+---------+---+------+
|LiLei|18|male|
|HanMeiMei|17|female|
|Jim|16|male|
+---------+---+------+

2,对Hive表进行增删改查操作

#删除hive表

query="DROPTABLEIFEXISTSstudents"
spark.sql(query)

#建立hive分区表
#(注:不可以使用中文字段作为分区字段)

query="""CREATETABLEIFNOTEXISTS`students`
(`name`STRINGCOMMENT'姓名',
`age`INTCOMMENT'年龄'
)
PARTITIONEDBY(`class`STRINGCOMMENT'班级',`gender`STRINGCOMMENT'性别')
""".replace("
","")
spark.sql(query)
##动态写入数据到hive分区表
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")#注意此处有一个设置操作
dfstudents=spark.createDataFrame([("LiLei",18,"class1","male"),
("HanMeimei",17,"class2","female"),
("DaChui",19,"class2","male"),
("Lily",17,"class1","female")]).toDF("name","age","class","gender")
dfstudents.show()

#动态写入分区
dfstudents.write.mode("overwrite").format("hive")
.partitionBy("class","gender").saveAsTable("students")
#写入到静态分区
dfstudents=spark.createDataFrame([("Jim",18,"class3","male"),
("Tom",19,"class3","male")]).toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass3")

#INSERTINTO尾部追加,INSERTOVERWRITETABLE覆盖分区
query="""
INSERTOVERWRITETABLE`students`
PARTITION(class='class3',gender='male')
SELECTname,agefromdfclass3
""".replace("
","")
spark.sql(query)
#写入到混合分区
dfstudents=spark.createDataFrame([("David",18,"class4","male"),
("Amy",17,"class4","female"),
("Jerry",19,"class4","male"),
("Ann",17,"class4","female")]).toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass4")

query="""
INSERTOVERWRITETABLE`students`
PARTITION(class='class4',gender)
SELECTname,age,genderfromdfclass4
""".replace("
","")
spark.sql(query)
#读取全部数据

dfdata=spark.sql("select*fromstudents")
dfdata.show()
+---------+---+------+------+
|name|age|class|gender|
+---------+---+------+------+
|Ann|17|class4|female|
|Amy|17|class4|female|
|HanMeimei|17|class2|female|
|DaChui|19|class2|male|
|LiLei|18|class1|male|
|Lily|17|class1|female|
|Jerry|19|class4|male|
|David|18|class4|male|
|Jim|18|class3|male|
|Tom|19|class3|male|
+---------+---+------+------+
责任编辑:haq
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 编程
    +关注

    关注

    90

    文章

    3708

    浏览量

    96765
  • SQL
    SQL
    +关注

    关注

    1

    文章

    789

    浏览量

    46371

原文标题:2 小时入门 SparkSQL 编程

文章出处:【微信号:DBDevs,微信公众号:数据分析与开发】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    TVS二极管的基本概念和主要作用

    芝识课堂的全新内容又和大家见面啦!从本期开始,我们将用四节课为大家系统介绍一位在电路设计中默默奉献的“无名英雄”——TVS二极管。我们会从它的基本概念、工作原理,聊到如何为电路挑选合适的型号、布局
    的头像 发表于 11-28 09:27 3931次阅读
    TVS二极管的<b class='flag-5'>基本概念</b>和主要作用

    C语言的printf基本用法介绍

    大家只需要掌握最基本的用法,以后随着编程知识的学习,我们会逐步介绍更加高级的用法,最终让大家完全掌握 printf。
    发表于 11-12 07:04

    USB/HID及其基本概念

    USB帧概念 如上图所示,在USB1.1规范当中,把USB总线时间按帧划分,每一帧占用时间是1ms; 每一帧内的最开始处是SOF token,在SOF内包含有11位的帧号; 每一帧的SOF帧号相比前
    的头像 发表于 08-20 10:32 3108次阅读
    USB/HID及其<b class='flag-5'>基本概念</b>

    电压波动与闪变的基本概念

    如果您是电力系统工程师、电气设备维护人员或者相关专业的学生,应该注意到了有关电能质量的国家标准GB/T 12326-2008是有关电压波动和闪变的,那这两个参数的考核意义是什么?国家标准规定这两个参数如何计算、测量和考核?这篇文章带您全面了解电压波动和闪变的基本概念、重要性以及国家标准的规定。
    的头像 发表于 07-22 14:10 2138次阅读
    电压波动与闪变的<b class='flag-5'>基本概念</b>

    SiC MOSFET的基本概念

    随着全球对能源效率和可持续发展的关注不断加深,宽禁带半导体材料的研究与应用逐渐成为电子器件行业的热点。碳化硅(SiC)作为一种重要的宽禁带半导体材料,因其优异的电气和热学特性,正在快速取代传统的硅(Si)器件,尤其是在高功率、高温和高频率应用中。SiCMOSFET(金属氧化物半导体场效应晶体管)在电力电子领域的广泛应用正在推动电源转换效率的提高,并助力实现更高效的电能管理。本文将详细探讨SiCMOSFET的应用领域、性能优势及未来发展趋势。
    的头像 发表于 07-08 16:20 732次阅读

    群延迟的基本概念和仿真实例分析

    在高速数字通信和射频系统中,信号从发送端到接收端的传输过程中会遇到各种失真和畸变。群延迟(Group Delay)作为描述系统相位线性度的重要参数,直接影响着信号保真度和系统性能。本文将深入浅出地介绍群延迟的基本概念、应用场景,并通过仿真示例展示其在实际工程中的重要性。
    的头像 发表于 07-08 15:14 1236次阅读
    群延迟的<b class='flag-5'>基本概念</b>和仿真实例分析

    浅谈无线通信的基本概念

    从工作频段到信道的划分,再到多址方式、双工方式、调制方式、分集技术和MIMO,这些概念共同作用,使得无线通信能够高效、可靠地进行。随着技术的不断发展,这些基础技术也在不断演进,尤其是在5G系统中,新的多址方式、双工技术和更复杂的MIMO系统都为未来的通信提供了更多的可能性。
    的头像 发表于 07-04 11:34 1066次阅读

    第十三章 通讯的基本概念

    本章介绍通讯基本概念,包括串行/并行、全双工/半双工/单工、同步/异步通讯,还提及通讯速率中比特率与波特率的概念
    的头像 发表于 05-22 17:29 1749次阅读
    第十三章 通讯的<b class='flag-5'>基本概念</b>

    芯片设计之握手协议

    本文主要介绍握手的基本概念,读者可通过该篇文章对握手有个基本概念
    的头像 发表于 05-14 09:16 963次阅读
    芯片设计之握手协议

    harmony OS NEXT-Navagation基本用法

    # Navagation基本用法 > Navigation组件是路由导航的根视图容器,一般作为Page页面的根容器使用,其内部默认包含了标题栏,内容栏和公工具栏,其中内容区默认首页显示导航内容
    的头像 发表于 04-27 17:39 716次阅读

    无线通信的基本概念

    在当今这个信息爆炸的时代,无线通信已经深入到我们生活的每一个角落。从手机通话、Wi-Fi 上网,到蓝牙耳机、智能手表,无线通信技术让我们的生活变得更加便捷和高效。但你知道吗?这一切的背后,都离不开神秘的电磁波。今天,就让我们一起揭开无线通信的神秘面纱,深入了解它的原理和奥秘。 一、电磁波:无线通信的基石 1. 电磁波的产生 要理解无线通信,我们首先要从电磁波说起。电荷是电场的源头,静止的电荷产生静止的电场,而运动的电荷则产生变化的电场。当电荷定向移动形成电流时,其周围就会存在变化的电场。变化的电场又会产生磁场,均匀变化的电场产生稳定的磁场,非均匀变化的电场则产生变化的磁场。反过来,变化的磁场又会产生变化的电场,电场和磁场就这样在空间中交替变化、相互耦合,向前传播,形成了电磁波。简单来说,电场和磁场的关系就是:变化的电场产生磁场,变化的磁场产生电场。 2. 电磁波的传播特点 电磁波是一种横波,它由同相振荡且互相垂直的电场与磁场在空间中衍生发射,以波动的形式传播,具有波粒二象性。电磁波的传播不需要介质,在真空中也能以光速传播。其传播方向垂直于电场与磁场构成的平面,电场方向、磁场方向和传播方向三者互相垂直。 二、电磁波的频率特性与频谱 1. 电磁波频率特性 电磁波有几个重要的属性:频率、周期和波长。 频率(f):指单位时间(1 秒)内,电磁波传播完整波形的个数。频率越高,就像小步快跑,单位时间内传播的波形越多;频率越低,则像大步流星,单位时间内传播的波形越少。 周期(T):是传输一个完整波形的电磁波所需要的时间。 波长(λ):传播一个完整波形所传输的物理距离,即电磁波向前奔跑的 “步长”。波长与频率成反比,波长越长,频率就越低;波长越短,频率越高。 2. 电磁波频谱 为了更好地了解各种电磁波,人们将它们按照波长或频率、波数、能量的大小顺序进行排列,形成了电磁波谱。从电磁波谱中可以看出,无论是无线电波、红外线,还是光、微波,其本质都是电磁波。无线通信的本质就是利用不同频率的电磁波承载信息。频率越高,在相同时间内可以承载的信息越多,数据速率越高;频率越低,承载的信息越少,数据速率越低。 无线电频谱是电磁波谱的一个子集,涵盖了 9kHz 到 300000GHz 之间频率的电磁波。无线频谱中的波在接收器解码之前是不可见也不可听的,所有无线信号都是通过空气传输的。不同的无线服务与不同的无线频谱区域相关联,例如 AM 广播使用 535 到 1605kHz 之间的频率。 三、无线通信原理 1. 无线通信术语 频率:单位时间内完成周期性变化的次数,描述周期运动频繁程度的量。 频带:将电磁波按频率划分为若干频率连续且宽度一定的区段,即一个电磁波频率连续的频率范围。 带宽:电磁波频带的宽度,在无线通信中,使用一段频率连续的电磁波传播信息,带宽就是电磁波信号的最高频率与最低频率的差值,用 Hz 表示。带宽越大,承载的信息量越大。 2. 无线通信的原理 无线通信是利用电波信号可以在自由空间中传播的特性进行信息交换的一种通信方式。它主要包括以下几个方面: 无线信号发射:通过发射天线产生的电磁波进行传输。信号发送方将数据转换为模拟信号,经过调制、信号放大等过程后,通过天线将信号发射出去。 空间传播:发射的信号在空间中自由传播,但传输过程中会遇到各种干扰,如多径效应(信号在传播中反射、散射等造成的多重信号)。因此,无线通信技术采用频率、编码等方式来提高传输的稳定性和可靠性。 信号接收:通过具有接收功能的天线,将传输中的电磁信号接收回来,经过解调等过程后还原成原始数据,使接收方可以获取到原始数据。 信号处理:接收的信号在数字信号处理中进行解码、去噪、恢复等处理,使得信号能够被准确地识别和处理,达到最终的通信目的。 四、无线通信的常见类型 无线通信有多种类型,常见的包括: Wi-Fi 通信:无线局域网技术,可用于传输数据和连接互联网,具有移动性、无需线缆、随时连接的优势。 蓝牙通信:短距离无线通信技术,可用于连接个人设备,如手机、耳机、蓝牙音响等,具有高速率、低功耗、安全性高等优点。 NFC 通信:近场通信技术,主要用于近距离的两个设备之间直接通信,如付款、数据传输等,通信速率快、交互简单、安全性高。 4G/5G 通信:第四 / 五代移动通信技术,用于无线网络数据传输,具有高速率、宽带、低延迟、高可靠性等优点,支持视频流媒体、云存储、短信通信等。 红外通信:利用红外线在空气中进行数据传输的技术,可用于电视遥控、文件传输、通讯等方面,但通讯距离短、易受遮挡等限制。
    发表于 02-28 13:45

    WIFI的基本概念介绍

    在当今数字化高度普及的时代,WIFI 技术已然成为人们生活、工作与学习中不可或缺的一部分。无论是在家中惬意地浏览网页、观看视频,还是在办公室里高效地处理文件、进行线上会议,又或是在公共场所便捷地连接网络,WIFI 都为我们提供了无缝的网络接入体验。但究竟什么是 WIFI 技术呢? WIFI是一种基于 IEEE 802.11 标准的无线局域网(WLAN)技术。简单来说,它允许电子设备在无需物理线缆连接的情况下,通过无线信号相互通信并接入互联网。其核心
    的头像 发表于 02-05 11:44 3963次阅读

    如何理解电磁波谱的基本概念

    电磁波谱是物理学中一个重要的概念,它涵盖了从极低频率到极高频率的所有电磁波。这些波以波的形式传播,不需要介质,可以在真空中传播。电磁波由电场和磁场组成,它们相互垂直,并且都垂直于波的传播方向。电磁波
    的头像 发表于 01-20 16:32 2345次阅读

    HTTP 协议的基本概念

    HTTP(HyperText Transfer Protocol,超文本传输协议)是一种用于分布式、协作式、超媒体信息系统的网络协议。HTTP 是互联网上应用最为广泛的协议之一,它定义了客户端(比如浏览器)和服务器之间请求和响应的格式。 1. HTTP协议概述 HTTP协议基于TCP/IP协议之上,主要规定了客户端与服务器之间的通信规则。它允许客户端通过发送请求来获取服务器上的资源,服务器则根据请求返回相应的响应。HTTP协议是无状态的,意味着每个请求都是独立的,服务器不会保
    的头像 发表于 12-29 15:12 2233次阅读

    了解虚拟电厂的基本概念

    虚拟电厂的基本概念: 虚拟电厂是一种基于现代信息技术和能源互联网的能源管理模式,它将分散的、可再生能源和储能设备通过虚拟化技术进行集成和管理,形成一个具有集中调度、统一运营和优化控制的虚拟化电力系统
    的头像 发表于 12-24 17:12 2067次阅读
    了解虚拟电厂的<b class='flag-5'>基本概念</b>