spark sql
是用于操作结构化数据的程序包
spark sql
,可以使用SQL
或者 HQL
来查询数据,查询结果以Dataset/DataFrame
的形式返回Hive
表、Parquet
以及 JSON
等SQL
和传统的RDD
变成相结合Dataset
:是一个分布式的数据集合
Spark 1.6
中被添加的新接口RDD
的优点与Spark SQL
执行引擎的优点Scala
和Java
中是可用的。Python
不支持Dataset API
。但是由于Python
的动态特性,许多DataSet API
的优点已经可用DataFrame
:是一个Dataset
组成的指定列。
Scala/Python
中,DataFrame
由DataSet
中的 RowS
(多个Row
) 来表示。在spark 2.0
之后,SQLContext
被 SparkSession
取代。
spark sql
中所有功能的入口点是SparkSession
类。它可以用于创建DataFrame
、注册DataFrame
为table
、在table
上执行SQL
、缓存table
、读写文件等等。
要创建一个SparkSession
,仅仅使用SparkSession.builder
即可:
from pyspark.sql import SparkSession
spark_session = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Builder
用于创建SparkSession
,它的方法有(这些方法都返回self
):
.appName(name)
:给程序设定一个名字,用于在Spark web UI
中展示。如果未指定,则spark
会随机生成一个。
name
:一个字符串,表示程序的名字.config(key=None,value=None,conf=None)
:配置程序。这里设定的配置会直接传递给SparkConf
和 SparkSession
各自的配置。
key
:一个字符串,表示配置名value
:对应配置的值conf
:一个SparkConf
实例有两种设置方式:
通过键值对设置:
xxxxxxxxxx
SparkSession.builder.config("spark.some.config.option", "some-value")
通过已有的SparkConf
设置:
xxxxxxxxxx
SparkSession.builder.config(conf=SparkConf())
.enableHiveSupport()
:开启Hive
支持。(spark 2.0
的新接口)
.master(master)
:设置spark master URL
。如:
master=local
:表示单机本地运行master=local[4]
:表示单机本地4核运行master=spark://master:7077
:表示在一个spark standalone cluster
上运行.getOrCreate()
:返回一个已有的SparkSession
实例;如果没有则基于当前builder
的配置,创建一个新的SparkSession
实例
SparkSession
实例。如果有,则返回它;如果没有,则创建一个作为全局默认SparkSession
实例,并返回它SparkSession
实例,则当前builder
的配置将应用到该实例上.builder = <pyspark.sql.session.Builder object at 0x7f51f134a110>
:一个Builder
实例
.catalog
:一个接口。用户通过它来create、drop、alter、query
底层的数据库、table
以及 function
等
SparkSession.catalog.cacheTable('tableName')
, 来缓存表;通过SparkSession.catalog.uncacheTable('tableName')
来从缓存中删除该表。.conf
:spark
的运行时配置接口。通过它,你可以获取、设置spark、hadoop
的配置。
.read
:返回一个DataFrameReader
,用于从外部存储系统中读取数据并返回DataFrame
.readStream
:返回一个DataStreamReader
,用于将输入数据流视作一个DataFrame
来读取
.sparkContext
:返回底层的SparkContext
.streams
:返回一个StreamingQueryManager
对象,它管理当前上下文的所有活动的StreamingQuery
.udf
:返回一个UDFRegistration
,用于UDF
注册
.version
:返回当前应用的spark
版本
.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
:从RDD
、一个列表、或者pandas.DataFrame
中创建一个DataFrame
参数:
data
:输入数据。可以为一个RDD
、一个列表、或者一个pandas.DataFrame
schema
:给出了DataFrame
的结构化信息。可以为:
data
中推断None
:此时要求data
是一个RDD
,且元素类型为Row、namedtuple、dict
之一。此时结构化信息从data
中推断(推断列名、列类型)pyspqrk.sql.types.StructType
:此时直接指定了每一列数据的类型。pyspark.sql.types.DataType
或者datatype string
:此时直接指定了一列数据的类型,会自动封装成pyspqrk.sql.types.StructType
(只有一列)。此时要求指定的类型与data
匹配(否则抛出异常)samplingRatio
:如果需要推断数据类型,则它指定了需要多少比例的行记录来执行推断。如果为None
,则只使用第一行来推断。
verifySchema
:如果为True
,则根据schema
检验每一行数据
返回值:一个DataFrame
实例
.newSession()
:返回一个新的SparkSession
实例,它拥有独立的SQLConf
、registered temporary views and UDFs
,但是共享同样的SparkContext
以及table cache
。
.range(start,end=None,step=1,numPartitions=None)
:创建一个DataFrame
,它只有一列。该列的列名为id
,类型为pyspark.sql.types.LongType
,数值为区间[start,end)
,间隔为step
(即:list(range(start,end,step))
)
.sql(sqlQuery)
:查询SQL
并以DataFrame
的形式返回查询结果
.stop()
:停止底层的SparkContext
.table(tableName)
:以DataFrame
的形式返回指定的table
SparkSession
中,应用程序可以从一个已经存在的RDD
、HIVE
表、或者spark
数据源中创建一个DataFrame
未指定列名:
xxxxxxxxxx
l = [('Alice', 1)]
spark_session.createDataFrame(l).collect()
结果为:
xxxxxxxxxx
[Row(_1=u'Alice', _2=1)] #自动分配列名
指定列名:
xxxxxxxxxx
l = [('Alice', 1)]
spark_session.createDataFrame(l, ['name', 'age']).collect()
结果为:
xxxxxxxxxx
[Row(name=u'Alice', age=1)]
通过字典指定列名:
xxxxxxxxxx
d = [{'name': 'Alice', 'age': 1}]
spark_session.createDataFrame(d).collect()
结果为:
xxxxxxxxxx
[Row(age=1, name=u'Alice')]
未指定列名:
xxxxxxxxxx
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd).collect()
结果为:
xxxxxxxxxx
[Row(_1=u'Alice', _2=1)] #自动分配列名
指定列名:
xxxxxxxxxx
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd, ['name', 'age']).collect()
结果为:
xxxxxxxxxx
[Row(name=u'Alice', age=1)]
通过Row
来创建:
xxxxxxxxxx
from pyspark.sql import Row
Person = Row('name', 'age')
rdd = sc.parallelize([('Alice', 1)]).map(lambda r: Person(*r))
spark_session.createDataFrame(rdd, ['name', 'age']).collect()
结果为:
xxxxxxxxxx
[Row(name=u'Alice', age=1)]
指定schema
:
xxxxxxxxxx
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd, schema).collect()
结果为:
xxxxxxxxxx
[Row(name=u'Alice', age=1)]
通过字符串指定schema
:
xxxxxxxxxx
rdd = sc.parallelize([('Alice', 1)])
spark_session.createDataFrame(rdd, "a: string, b: int").collect()
结果为:
xxxxxxxxxx
[Row(name=u'Alice', age=1)]
如果只有一列,则字符串schema
为:
xxxxxxxxxx
rdd = sc.parallelize([1])
spark_session.createDataFrame(rdd, "int").collect()
结果为:
xxxxxxxxxx
[Row(value=1)]
使用方式:
xxxxxxxxxx
df = pd.DataFrame({'a':[1,3,5],'b':[2,4,6]})
spark_session.createDataFrame(df).collect()
结果为:
xxxxxxxxxx
[Row(a=1, b=2), Row(a=3, b=4), Row(a=5, b=6)]
从数据源创建的接口是DataFrameReader
:
xxxxxxxxxx
reader = spark_session.read
另外,也可以不使用API
,直接将文件加载到DataFrame
并进行查询:
xxxxxxxxxx
df = spark_session.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
设置数据格式:.format(source)
。
self
xxxxxxxxxx
df = spark_session.read.format('json').load('python/test_support/sql/people.json')
设置数据schema
:.schema(schema)
。
self
schema
。一旦手动指定了schema
,则不再需要推断。加载:.load(path=None, format=None, schema=None, **options)
参数:
path
:一个字符串,或者字符串的列表。指出了文件的路径format
:指出了文件类型。默认为parquet
(除非另有配置spark.sql.sources.default
)schema
:输入数据的schema
,一个StructType
类型实例。options
:其他的参数返回值:一个DataFrame
实例
示例:
xxxxxxxxxx
spark_session.read.format('json').load(['python/test_support/sql/people.json',
'python/test_support/sql/people1.json'])
.csv()
:加载csv
文件,返回一个DataFrame
实例
xxxxxxxxxx
.csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
.jdbc()
:加载数据库中的表
xxxxxxxxxx
.jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
参数:
url
:一个JDBC URL
,格式为:jdbc:subprotocol:subname
table
:表名column
:列名。该列为整数列,用于分区。如果该参数被设置,那么numPartitions、lowerBound、upperBound
将用于分区从而生成where
表达式来拆分该列。lowerBound
:column
的最小值,用于决定分区的步长upperBound
:column
的最大值(不包含),用于决定分区的步长numPartitions
:分区的数量predicates
:一系列的表达式,用于where
中。每一个表达式定义了DataFrame
的一个分区properties
:一个字典,用于定义JDBC
连接参数。通常至少为:{ 'user' : 'SYSTEM', 'password' : 'mypassword'}
返回:一个DataFrame
实例
.json()
:加载json
文件,返回一个DataFrame
实例
xxxxxxxxxx
.json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)
示例:
xxxxxxxxxx
spark_session.read.json('python/test_support/sql/people.json')
# 或者
rdd = sc.textFile('python/test_support/sql/people.json')
spark_session.read.json(rdd)
.orc()
:加载ORC
文件,返回一个DataFrame
实例
xxxxxxxxxx
.orc(path)
示例:
xxxxxxxxxx
spark_session.read.orc('python/test_support/sql/orc_partitioned')
.parquet()
:加载Parquet
文件,返回一个DataFrame
实例
.parquet(*paths)
示例:
xxxxxxxxxx
spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
.table()
: 从table
中创建一个DataFrame
xxxxxxxxxx
.table(tableName)
示例:
xxxxxxxxxx
df = spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
df.createOrReplaceTempView('tmpTable')
spark_session.read.table('tmpTable')
.text()
:从文本中创建一个DataFrame
xxxxxxxxxx
.text(paths)
它不同于.csv()
,这里的DataFrame
只有一列,每行文本都是作为一个字符串。
示例:
xxxxxxxxxx
spark_session.read.text('python/test_support/sql/text-test.txt').collect()
#结果为:[Row(value=u'hello'), Row(value=u'this')]
spark SQL
还支持读取和写入存储在Apache Hive
中的数据。但是由于Hive
具有大量依赖关系,因此这些依赖关系不包含在默认spark
版本中。
Hive
依赖项,则Spark
将会自动加载它们Hive
的依赖关系也必须存在于所有工作节点上配置:将hive-site.xml
、core-site.html
(用于安全配置)、hdfs-site.xml
(用户HDFS
配置) 文件放在conf/
目录中完成配置。
当使用Hive
时,必须使用启用Hive
支持的SparkSession
对象(enableHiveSupport
)
Hive
,则开启Hive
支持不会报错当hive-site.xml
未配置时,上下文会自动在当前目录中创建metastore_db
,并创建由spark.sql.warehouse.dir
指定的目录
访问示例:
xxxxxxxxxx
from pyspark.sql import SparkSession
spark_sess = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", '/home/xxx/yyy/') \
.enableHiveSupport() \
.getOrCreate()
spark_sess.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark_sess.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
spark.sql("SELECT * FROM src").show()
创建Hive
表时,需要定义如何向/从文件系统读写数据,即:输入格式、输出格式。还需要定义该表的数据的序列化与反序列化。
可以通过在OPTIONS
选项中指定这些属性:
xxxxxxxxxx
spark_sess.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive OPTIONS(fileFormat 'parquet')")
可用的选项有:
fileFormat
:文件格式。目前支持6种文件格式:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile'、'avro'
。
inputFormat,outputFormat
:这两个选项将相应的InputFormat
和 OutputFormat
类的名称指定为字符串文字,如'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
fileFormat
,则无法指定它们serde
:该选项指定了serde
类的名称
fileFormat
已经包含了serde
信息(如何序列化、反序列化的信息),则不要指定该选项sequencefile、textfile、rcfile
不包含serde
信息,因此可以使用该选项fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
:这些选项只能与textfile
文件格式一起使用,它们定义了如何将分隔的文件读入行。
DataFrame
通过DataFrameWriter
实例来保存到各种外部存储系统中。
DataFrame.write
来访问DataFrameWriter
.format(source)
:设置数据格式
self
xxxxxxxxxx
df.write.format('json').save('./data.json')
.mode(saveMode)
:当要保存的目标位置已经有数据时,设置该如何保存。
参数: saveMode
可以为:
'append'
:追加写入'overwrite'
:覆写已有数据'ignore'
:忽略本次保存操作(不保存)'error'
:抛出异常(默认行为)返回self
示例:
xxxxxxxxxx
df.write.mode('append').parquet('./data.dat')
.partitionBy(*cols)
:按照指定的列名来将输出的DataFrame
分区。
返回self
示例:
xxxxxxxxxx
df.write.partitionBy('year', 'month').parquet('./data.dat')
.save(path=None, format=None, mode=None, partitionBy=None, **options)
:保存DataFrame
.csv()
:将DataFrame
保存为csv
文件
xxxxxxxxxx
.csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None)
示例:
xxxxxxxxxx
df.write.csv('./data.csv')
.insertInto()
:将DataFrame
保存在table
中
xxxxxxxxxx
.insertInto(tableName, overwrite=False)
它要求当前的DataFrame
与指定的table
具有同样的schema
。其中overwrite
参数指定是否覆盖table
现有的数据。
.jdbc()
:将DataFrame
保存在数据库中
xxxxxxxxxx
.jdbc(url, table, mode=None, properties=None)
参数:
url
:一个JDBC URL
,格式为:jdbc:subprotocol:subname
table
:表名
mode
:指定当数据表中已经有数据时,如何保存。可以为:
'append'
:追加写入'overwrite'
:覆写已有数据'ignore'
:忽略本次保存操作(不保存)'error'
:抛出异常(默认行为)properties
:一个字典,用于定义JDBC
连接参数。通常至少为:{ 'user' : 'SYSTEM', 'password' : 'mypassword'}
.json()
:将DataFrame
保存为json
文件
xxxxxxxxxx
.json(path, mode=None, compression=None, dateFormat=None, timestampFormat=None)
示例:
xxxxxxxxxx
df.write.json('./data.json')
.orc()
:将DataFrame
保存为ORC
文件
xxxxxxxxxx
.orc(path, mode=None, partitionBy=None, compression=None)
.pqrquet()
:将DataFrame
保存为Pqrquet
格式的文件
xxxxxxxxxx
.parquet(path, mode=None, partitionBy=None, compression=None)
.saveAsTable()
:将DataFrame
保存为table
xxxxxxxxxx
.saveAsTable(name, format=None, mode=None, partitionBy=None, **options)
.text()
:将DataFrame
保存为文本文件
xxxxxxxxxx
.text(path, compression=None)
该DataFrame
必须只有一列,切该列必须为字符串。每一行数据将作为文本的一行。
一个DataFrame
实例代表了基于命名列的分布式数据集。
为了访问DataFrame
的列,有两种方式:
df.key
df[key]
。推荐用这种方法,因为它更直观。它并不支持
pandas.DataFrame
中其他的索引,以及各种切片方式
.columns
:以列表的形式返回所有的列名.dtypes
:以列表的形式返回所有的列的名字和数据类型。形式为:[(col_name1,col_type1),...]
.isStreaming
:如果数据集的数据源包含一个或者多个数据流,则返回True
.na
:返回一个DataFrameNaFunctions
对象,用于处理缺失值。.rdd
: 返回DataFrame
底层的RDD
(元素类型为Row
).schema
:返回DataFrame
的 schema
.stat
:返回DataFrameStatFunctions
对象,用于统计.storageLevel
:返回当前的缓存级别.write
:返回一个DataFrameWriter
对象,它是no-streaming DataFrame
的外部存储接口.writeStream
:返回一个DataStreamWriter
对象,它是streaming DataFrame
的外部存储接口聚合操作:
.agg(*exprs)
:在整个DataFrame
开展聚合操作(是df.groupBy.agg()
的快捷方式)
示例:
xxxxxxxxxx
df.agg({"age": "max"}).collect() #在 agg 列上聚合
# 结果为:[Row(max(age)=5)]
# 另一种方式:
from pyspark.sql import functions as F
df.agg(F.max(df.age)).collect()
.filter(condition)
:对行进行过滤。
它是where()
的别名
参数:
condition
:一个types.BooleanType
的Column
,或者一个字符串形式的SQL
的表达式示例:
xxxxxxxxxx
df.filter(df.age > 3).collect()
df.filter("age > 3").collect()
df.where("age = 2").collect()
分组:
.cube(*cols)
:根据当前DataFrame
的指定列,创建一个多维的cube
,从而方便我们之后的聚合过程。
参数:
cols
:指定的列名或者Column
的列表返回值:一个GroupedData
对象
.groupBy(*cols)
:通过指定的列来将DataFrame
分组,从而方便我们之后的聚合过程。
参数:
cols
:指定的列名或者Column
的列表返回值:一个GroupedData
对象
它是groupby
的别名
.rollup(*cols)
:创建一个多维的rollup
,从而方便我们之后的聚合过程。
参数:
cols
:指定的列名或者Column
的列表返回值:一个GroupedData
对象
排序:
.orderBy(*cols, **kwargs)
:返回一个新的DataFrame
,它根据旧的DataFrame
指定列排序
参数:
cols
:一个列名或者Column
的列表,指定了排序列
ascending
:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序
cols
长度相同.sort(*cols, **kwargs)
:返回一个新的DataFrame
,它根据旧的DataFrame
指定列排序
参数:
cols
:一个列名或者Column
的列表,指定了排序列
ascending
:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序
cols
长度相同示例:
xfrom pyspark.sql.functions import *
df.sort(df.age.desc())
df.sort("age", ascending=False)
df.sort(asc("age"))
df.orderBy(df.age.desc())
df.orderBy("age", ascending=False)
df.orderBy(asc("age"))
.sortWithinPartitions(*cols, **kwargs)
:返回一个新的DataFrame
,它根据旧的DataFrame
指定列在每个分区进行排序
参数:
cols
:一个列名或者Column
的列表,指定了排序列
ascending
:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序
cols
长度相同调整分区:
.coalesce(numPartitions)
:返回一个新的DataFrame
,拥有指定的numPartitions
分区。
只能缩小分区数量,而无法扩张分区数量。如果numPartitions
比当前的分区数量大,则新的DataFrame
的分区数与旧DataFrame
相同
它的效果是:不会混洗数据
参数:
numPartitions
:目标分区数量.repartition(numPartitions, *cols)
:返回一个新的DataFrame
,拥有指定的numPartitions
分区。
DataFrame
是通过hash
来分区集合操作:
.crossJoin(other)
:返回一个新的DataFrame
,它是输入的两个DataFrame
的笛卡儿积
可以理解为
[row1,row2]
,其中row1
来自于第一个DataFrame
,row2
来自于第二个DataFrame
参数:
other
:另一个DataFrame
对象.intersect(other)
:返回两个DataFrame
的行的交集
参数:
other
:另一个DataFrame
对象.join(other,on=None,how=None)
:返回两个DataFrame
的 join
参数:
other
:另一个DataFrame
对象
on
:指定了在哪些列上执行对齐。可以为字符串或者Column
(指定单个列)、也可以为字符串列表或者Column
列表(指定多个列)
注意:要求两个DataFrame
都存在这些列
how
:指定join
的方式,默认为'inner'
。可以为: inner
、 cross
、 outer
、
full
、 full_outer
、 left
、 left_outer
、 right
、right_outer
、
left_semi
、 left_anti
.subtract(other)
:返回一个新的DataFrame
,它的行由位于self
中、但是不在other
中的Row
组成。
参数:
other
:另一个DataFrame
对象.union(other)
: 返回两个DataFrame
的行的并集(它并不会去重)
它是unionAll
的别名
参数:
other
:另一个DataFrame
对象统计:
.crosstab(col1, col2)
:统计两列的成对频率。要求每一列的distinct
值数量少于 个。最多返回 对频率。
它是DataFrameStatFunctions.crosstab()
的别名
结果的第一列的列名为,col1_col2
,值就是第一列的元素值。后面的列的列名就是第二列元素值,值就是对应的频率。
参数:
col1,col2
:列名字符串(或者Column
)示例:
xxxxxxxxxx
df =pd.DataFrame({'a':[1,3,5],'b':[2,4,6]})
s_df = spark_session.createDataFrame(df)
s_df.crosstab('a','b').collect()
#结果: [Row(a_b='5', 2=0, 4=0, 6=1), Row(a_b='1', 2=1, 4=0, 6=0), Row(a_b='3', 2=0, 4=1, 6=0)]
.describe(*cols)
:计算指定的数值列、字符串列的统计值。
统计结果包括:count、mean、stddev、min、max
该函数仅仅用于探索数据规律
参数:
cols
:列名或者多个列名字符串(或者Column
)。如果未传入任何列名,则计算所有的数值列、字符串列.freqItems(cols,support=None)
:寻找指定列中频繁出现的值(可能有误报)
它是DataFrameStatFunctions.freqItems()
的别名
参数:
cols
:字符串的列表或者元组,指定了待考察的列support
:指定所谓的频繁的标准(默认是 1%)。该数值必须大于 移除数据:
.distinct()
:返回一个新的DataFrame
,它保留了旧DataFrame
中的distinct
行。
即:根据行来去重
.drop(*cols)
:返回一个新的DataFrame
,它剔除了旧DataFrame
中的指定列。
参数:
cols
:列名字符串(或者Column
)。如果它在旧DataFrame
中不存在,也不做任何操作(也不报错).dropDuplicates(subset=None)
:返回一个新的DataFrame
,它剔除了旧DataFrame
中的重复行。
它与.distinct()
区别在于:它仅仅考虑指定的列来判断是否重复行。
参数:
subset
:列名集合(或者Column
的集合)。如果为None
,则考虑所有的列。.drop_duplicates
是 .dropDuplicates
的别名
.dropna(how='any', thresh=None, subset=None)
:返回一个新的DataFrame
,它剔除了旧DataFrame
中的null
行。
它是DataFrameNaFunctions.drop()
的别名
参数:
how
:指定如何判断null
行的标准。'all'
:所有字段都是na
,则是空行;'any'
:任何字段存在na
,则是空行。thresh
:一个整数。当一行中,非null
的字段数量小于thresh
时,认为是空行。如果该参数设置,则不考虑how
subset
:列名集合,给出了要考察的列。如果为None
,则考察所有列。.limit(num)
:返回一个新的DataFrame
,它只有旧DataFrame
中的num
行。
采样、拆分:
.randomSplit(weights, seed=None)
:返回一组新的DataFrame
,它是旧DataFrame
的随机拆分
参数:
weights
:一个double
的列表。它给出了每个结果DataFrame
的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0seed
:随机数种子示例:
xxxxxxxxxx
splits = df.randomSplit([1.0, 2.0], 24)
splits[0].count()
.sample(withReplacement, fraction, seed=None)
:返回一个新的DataFrame
,它是旧DataFrame
的采样
参数:
withReplacement
:如果为True
,则可以重复采样;否则是无放回采样
fractions
:新的DataFrame
的期望大小(占旧DataFrame
的比例)。spark
并不保证结果刚好满足这个比例(只是一个期望值)
withReplacement=True
:则表示每个元素期望被选择的次数withReplacement=False
:则表示每个元素期望被选择的概率seed
:随机数生成器的种子
.sampleBy(col, fractions, seed=None)
:返回一个新的DataFrame
,它是旧DataFrame
的采样
它执行的是无放回的分层采样。分层由col
列指定。
参数:
col
:列名或者Column
,它给出了分层的依据fractions
:一个字典,给出了每个分层抽样的比例。如果某层未指定,则其比例视作 0示例:
xxxxxxxxxx
sampled = df.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
# df['key'] 这一列作为分层依据,0 抽取 10%, 1 抽取 20%
替换:
.replace(to_replace, value=None, subset=None)
:返回一组新的DataFrame
,它是旧DataFrame
的数值替代结果
它是DataFrameNaFunctions.replace()
的别名
当替换时,value
将被类型转换到目标列
参数:
to_replace
:可以为布尔、整数、浮点数、字符串、列表、字典,给出了被替代的值。
value
:一个整数、浮点数、字符串、列表。给出了替代值。
subset
:列名的列表。指定要执行替代的列。
.fillna(value, subset=None)
:返回一个新的DataFrame
,它替换了旧DataFrame
中的null
值。
它是DataFrameNaFunctions.fill()
的别名
参数:
value
:一个整数、浮点数、字符串、或者字典,用于替换null
值。如果是个字典,则忽略subset
,字典的键就是列名,指定了该列的null
值被替换的值。subset
:列名集合,给出了要被替换的列选取数据:
.select(*cols)
:执行一个表达式,将其结果返回为一个DataFrame
参数:
cols
:一个列名的列表,或者Column
表达式。如果列名为*
,则扩张到所有的列名示例:
xxxxxxxxxx
df.select('*')
df.select('name', 'age')
df.select(df.name, (df.age + 10).alias('age'))
.selectExpr(*expr)
:执行一个SQL
表达式,将其结果返回为一个DataFrame
参数:
expr
:一组SQL
的字符串描述示例:
xxxxxxxxxx
df.selectExpr("age * 2", "abs(age)")
.toDF(*cols)
:选取指定的列组成一个新的DataFrame
参数:
cols
:列名字符串的列表.toJSON(use_unicode=True)
:返回一个新的DataFrame
,它将旧的DataFrame
转换为RDD
(元素为字符串),其中每一行转换为json
字符串。
列操作:
.withColumn(colName, col)
:返回一个新的DataFrame
,它将旧的DataFrame
增加一列(或者替换现有的列)
参数:
colName
:一个列名,表示新增的列(如果是已有的列名,则是替换的列)col
:一个Column
表达式,表示新的列示例:
xxxxxxxxxx
df.withColumn('age2', df.age + 2)
.withColumnRenamed(existing, new)
:返回一个新的DataFrame
,它将旧的DataFrame
的列重命名
参数:
existing
:一个字符串,表示现有的列的列名col
:一个字符串,表示新的列名查看数据:
.collect()
:以Row
的列表的形式返回所有的数据
.first()
:返回第一行(一个Row
对象)
.head(n=None)
:返回前面的n
行
参数:
n
:返回行的数量。默认为1返回值:
Row
对象Row
的列表.show(n=20, truncate=True)
:在终端中打印前 n
行。
它并不返回结果,而是print
结果
参数:
n
:打印的行数truncate
:如果为True
,则超过20个字符的字符串被截断。如果为一个数字,则长度超过它的字符串将被截断。.take(num)
:以Row
的列表的形式返回开始的num
行数据。
参数:
num
:返回行的数量.toLocalIterator()
:返回一个迭代器,对它迭代的结果就是DataFrame
的每一行数据(Row
对象)
统计:
.corr(col1, col2, method=None)
:计算两列的相关系数,返回一个浮点数。当前仅支持皮尔逊相关系数
DataFrame.corr()
是DataFrameStatFunctions.corr()
的别名
参数:
col,col2
:为列的名字字符串(或者Column
)。method
:当前只支持'pearson'
.cov(col1,col2)
:计算两列的协方差。
DataFrame.cov()
是DataFrameStatFunctions.cov()
的别名
参数:
col,col2
:为列的名字字符串(或者Column
).count()
:返回当前DataFrame
有多少行
遍历:
.foreach(f)
:对DataFrame
中的每一行应用f
df.rdd.foreach()
的快捷方式.foreachPartition(f)
:对DataFrame
的每个分区应用f
它是df.rdd.foreachPartition()
的快捷方式
示例:
xxxxxxxxxx
def f(person):
print(person.name)
df.foreach(f)
def f(people):
for person in people:
print(person.name)
df.foreachPartition(f)
.toPandas()
:将DataFrame
作为pandas.DataFrame
返回
缓存:
.cache()
:使用默认的storage level
缓存DataFrame
(缓存级别为:MEMORY_AND_DISK
)
.persist(storageLevel=StorageLevel(True, True, False, False, 1))
:缓存DataFrame
参数:
storageLevel
:缓存级别。默认为MEMORY_AND_DISK
.unpersist(blocking=False)
:标记该DataFrame
为未缓存的,并且从内存和磁盘冲移除它的缓存块。
.isLocal()
:如果collect()
和 take()
方法能本地运行(不需要任何executor
节点),则返回True
。否则返回False
.printSchema()
:打印DataFrame
的 schema
.createTempView(name)
:创建一个临时视图,name
为视图名字。
临时视图是session
级别的,会随着session
的消失而消失。
如果指定的临时视图已存在,则抛出TempTableAlreadyExistsException
异常。
参数:
name
:视图名字示例:
xxxxxxxxxx
df.createTempView("people")
df2 = spark_session.sql("select * from people")
.createOrReplaceTempView(name)
:创建一个临时视图,name
为视图名字。如果该视图已存在,则替换它。
参数:
name
:视图名字.createGlobalTempView(name)
:创建一个全局临时视图,name
为视图名字
spark sql
中的临时视图是session
级别的,会随着session
的消失而消失。如果希望一个临时视图跨session
而存在,则可以建立一个全局临时视图。
如果指定的全局临时视图已存在,则抛出TempTableAlreadyExistsException
异常。
全局临时视图存在于系统数据库global_temp
中,必须加上库名取引用它
参数:
name
:视图名字示例:
xxxxxxxxxx
df.createGlobalTempView("people")
spark_session.sql("SELECT * FROM global_temp.people").show()
.createOrReplaceGlobalTempView(name)
:创建一个全局临时视图,name
为视图名字。如果该视图已存在,则替换它。
参数:
name
:视图名字.registerTempTable(name)
:创建一个临时表,name
为表的名字。
在
spark 2.0
中被废弃,推荐使用createOrReplaceTempView
.explain(extended=False)
:打印logical plan
和physical plan
,用于调试模式
参数:
extended
:如果为False
,则仅仅打印physical plan
一个Row
对象代表了DataFrame
的一行
你可以通过两种方式来访问一个Row
对象:
row.key
row[key]
key in row
将在Row
的键上遍历(而不是值上遍历)
创建Row
:通过关键字参数来创建:
xxxxxxxxxx
row = Row(name="Alice", age=11)
None
,则必须显式指定,而不能忽略你可以创建一个Row
作为一个类来使用,它的作用随后用于创建具体的Row
xxxxxxxxxx
Person = Row("name", "age")
p1 = Person("Alice", 11)
方法:
.asDict(recursive=False)
:以字典的方式返回该Row
实例。如果recursive=True
,则递归的处理元素中包含的Row
Column
代表了DataFrame
的一列
有两种创建Column
的方式:
通过DataFrame
的列名来创建:
xxxxxxxxxx
df.colName
df['colName']
通过Column
表达式来创建:
xxxxxxxxxx
df.colName+1
1/df['colName']
.alias(*alias, **kwargs)
:创建一个新列,它给旧列一个新的名字(或者一组名字,如explode
表达式会返回多列)
name()
的别名参数:
alias
:列的别名metadata
:一个字符串,存储在列的metadata
属性中示例:
xxxxxxxxxx
df.select(df.age.alias("age2"))
# 结果为: [Row(age2=2), Row(age2=5)]
df.select(df.age.alias("age3",metadata={'max': 99})
).schema['age3'].metadata['max']
# 结果为: 99
排序:
.asc()
:创建一个新列,它是旧列的升序排序的结果.desc()
:创建一个新列,它是旧列的降序排序的结果.astype(dataType)
:创建一个新列,它是旧列的数值转换的结果
.cast()
的别名.between(lowerBound, upperBound)
:创建一个新列,它是一个布尔值。如果旧列的数值在[lowerBound, upperBound]
(闭区间)之内,则为True
逻辑操作:返回一个新列,是布尔值。other
为另一Column
.bitwiseAND(other)
:二进制逻辑与.bitwiseOR(other)
:二进制逻辑或.bitwiseXOR(other)
:二进制逻辑异或元素抽取:
.getField(name)
:返回一个新列,是旧列的指定字段组成。
此时要求旧列的数据是一个StructField
(如Row
)
参数:
name
:一个字符串,是字段名示例:
xxxxxxxxxx
df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df.select(df.r.getField("b"))
#或者
df.select(df.r.a)
.getItem(key)
:返回一个新列,是旧列的指定位置(列表),或者指定键(字典)组成。
参数:
key
:一个整数或者一个字符串示例:
xxxxxxxxxx
df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
df.select(df.l.getItem(0), df.d.getItem("key"))
#或者
df.select(df.l[0], df.d["key"])
判断:
.isNotNull()
:返回一个新列,是布尔值。表示旧列的值是否非null
.isNull()
:返回一个新列,是布尔值。表示旧列的值是否null
.isin(*cols)
:返回一个新列,是布尔值。表示旧列的值是否在cols
中
参数:
cols
:一个列表或者元组示例:
xxxxxxxxxx
df[df.name.isin("Bob", "Mike")]
df[df.age.isin([1, 2, 3])]
like(other)
:返回一个新列,是布尔值。表示旧列的值是否like other
。它执行的是SQL
的 like
语义
参数:
other
:一个字符串,是SQL like
表达式示例:
xxxxxxxxxx
df.filter(df.name.like('Al%'))
rlike(other)
:返回一个新列,是布尔值。表示旧列的值是否rrlike other
。它执行的是SQL
的 rlike
语义
参数:
other
:一个字符串,是SQL rlike
表达式字符串操作:other
为一个字符串。
.contains(other)
:返回一个新列,是布尔值。表示是否包含other
。
.endswith(other)
:返回一个新列,是布尔值。表示是否以other
结尾。
示例:
xxxxxxxxxx
df.filter(df.name.endswith('ice'))
.startswith(other)
:返回一个新列,是布尔值。表示是否以other
开头。
.substr(startPos, length)
:返回一个新列,它是旧列的子串
参数:
startPos
:子串开始位置(整数或者Column
)length
:子串长度(整数或者Column
).when(condition, value)
:返回一个新列。
对条件进行求值,如果满足条件则返回value
,如果不满足:
.otherwise()
调用,则返回otherwise
的结果.otherwise()
调用,则返回None
参数:
condition
:一个布尔型的Column
表达式value
:一个字面量值,或者一个Column
表达式示例:
xxxxxxxxxx
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0))
.otherwise(value)
:value
为一个字面量值,或者一个Column
表达式
GroupedData
通常由DataFrame.groupBy()
创建,用于分组聚合.agg(*exprs)
:聚合并以DataFrame
的形式返回聚合的结果
可用的聚合函数包括:avg、max、min、sum、count
参数:
exprs
:一个字典,键为列名,值为聚合函数字符串。也可以是一个Column
的列表示例:
xxxxxxxxxx
df.groupBy(df.name).agg({"*": "count"}) #字典
# 或者
from pyspark.sql import functions as F
df.groupBy(df.name).agg(F.min(df.age)) #字典
统计:
.avg(*cols)
:统计数值列每一组的均值,以DataFrame
的形式返回
它是mean()
的别名
参数:
cols
:列名或者列名的列表示例:
xxxxxxxxxx
df.groupBy().avg('age')
df.groupBy().avg('age', 'height')
.count()
:统计每一组的记录数量,以DataFrame
的形式返回
.max(*cols)
:统计数值列每一组的最大值,以DataFrame
的形式返回
参数:
cols
:列名或者列名的列表.min(*cols)
:统计数值列每一组的最小值,以DataFrame
的形式返回
参数:
cols
:列名或者列名的列表.sum(*cols)
:统计数值列每一组的和,以DataFrame
的形式返回
参数:
cols
:列名或者列名的列表.pivot(pivot_col, values=None)
:对指定列进行透视。
参数:
pivot_col
:待分析的列的列名values
:待分析的列上,待考察的值的列表。如果为空,则spark
会首先计算pivot_col
的 distinct
值示例:
xxxxxxxxxx
df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings")
#结果为:[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
# "dotNET", "Java" 是 course 字段的值
pyspark.sql.functions
模块提供了一些内建的函数,它们用于创建Column
col
,表示列名或者Column
。Column
这里的col
都是数值列。
abs(col)
:计算绝对值
acos(col)
:计算acos
cos(col)
:计算cos
值
cosh(col)
:计算cosh
值
asin(col)
:计算asin
atan(col)
:计算atan
atan2(col1,col2)
:计算从直角坐标 到极坐标 的角度
bround(col,scale=0)
:计算四舍五入的结果。如果scale>=0
,则使用HALF_EVEN
舍入模式;如果scale<0
,则将其舍入到整数部分。
cbrt(col)
:计算立方根
ceil(col)
:计算ceiling
值
floor(col)
:计算floor
值
corr(col1,col2)
:计算两列的皮尔逊相关系数
covar_pop(col1,col2)
:计算两列的总体协方差 (公式中的除数是 N
)
covar_samp(col1,col2)
:计算两列的样本协方差 (公式中的除数是 N-1
)
degrees(col)
:将弧度制转换为角度制
radians(col)
:将角度制转换为弧度制
exp(col)
:计算指数:
expml(col)
:计算指数减一:
fractorial(col)
:计算阶乘
pow(col1,col2)
:返回幂级数
hash(*cols)
:计算指定的一些列的hash code
,返回一个整数列
参数:
cols
:一组列名或者Columns
hypot(col1,col2)
:计算 (没有中间产出的上溢出、下溢出),返回一个数值列
log(arg1,arg2=None)
:计算对数。其中第一个参数为底数。如果只有一个参数,则使用自然底数。
参数:
arg1
:如果有两个参数,则它给出了底数。否则就是对它求自然底数。arg2
:如果有两个参数,则对它求对数。log10(col)
:计算基于10的对数
log1p(col)
:计算
log2(col)
:计算基于2的对数
rand(seed=None)
:从均匀分布U~[0.0,1.0]
生成一个独立同分布(i.i.d
) 的随机列
参数:
seed
:一个整数,表示随机数种子。randn(seed=None)
:从标准正态分布N~(0.0,1.0)
生成一个独立同分布(i.i.d
) 的随机列
参数:
seed
:一个整数,表示随机数种子。rint(col)
:返回最接近参数值的整数的double
形式。
round(col,scale=0)
:返回指定参数的四舍五入形式。
如果scale>=0
,则使用HALF_UP
的舍入模式;否则直接取参数的整数部分。
signum(col)
:计算正负号
sin(col)
:计算sin
sinh(col)
:计算 sinh
sqrt(col)
:计算平方根
tan(col)
:计算tan
tanh(col)
:计算tanh
toDegreees(col)
:废弃。使用degrees()
代替
toRadias(col)
:废弃,使用radians()
代替
ascii(col)
:返回一个数值列,它是旧列的字符串中的首个字母的ascii
值。其中col
必须是字符串列。
base64(col)
:返回一个字符串列,它是旧列(二进制值)的BASE64
编码得到的字符串。其中col
必须是二进制列。
bin(col)
:返回一个字符串列,它是旧列(二进制值)的字符串表示(如二进制1101
的字符串表示为'1101'
)其中col
必须是二进制列。
cov(col,fromBase,toBase)
:返回一个字符串列,它是一个数字的字符串表达从fromBase
转换到toBase
。
参数:
col
:一个字符串列,它是数字的表达。如1028
。它的基数由fromBase
给出fromBase
:一个整数,col
中字符串的数值的基数。toBase
:一个整数,要转换的数值的基数。示例:
xxxxxxxxxx
df = spark_session.createDataFrame([("010101",)], ['n'])
df.select(conv(df.n, 2, 16).alias('hex')).collect()
# 结果:[Row(hex=u'15')]
concat(*cols)
:创建一个新列,它是指定列的字符串拼接的结果(没有分隔符)。
参数
cols
:列名字符串列表,或者Column
列表。要求这些列具有同样的数据类型concat_ws(sep,*cols)
:创建一个新列,它是指定列的字符串使用指定的分隔符拼接的结果。
参数
sep
:一个字符串,表示分隔符cols
:列名字符串列表,或者Column
列表。要求这些列具有同样的数据类型decode(col,charset)
:从二进制列根据指定字符集来解码成字符串。
参数:
col
:一个字符串或者Column
,为二进制列charset
:一个字符串,表示字符集。encode(col,charset)
:把字符串编码成二进制格式。
参数:
col
:一个字符串或者Column
,为字符串列charset
:一个字符串,表示字符集。format_number(col,d)
:格式化数值成字符串,根据HALF_EVEN
来四舍五入成d
位的小数。
参数:
col
:一个字符串或者Column
,为数值列d
:一个整数,格式化成表示d
位小数。format_string(format,*cols)
:返回print
风格的格式化字符串。
参数:
format
:print
风格的格式化字符串。如%s%d
cols
:一组列名或者Columns
,用于填充format
hex(col)
:计算指定列的十六进制值(以字符串表示)。
参数:
col
:一个字符串或者Column
,为字符串列、二进制列、或者整数列initcap(col)
:将句子中每个单词的首字母大写。
参数:
col
:一个字符串或者Column
,为字符串列input_file_name()
:为当前的spark task
的文件名创建一个字符串列
instr(str,substr)
:给出substr
在str
的首次出现的位置。位置不是从0开始,而是从1开始的。
如果substr
不在str
中,则返回 0 。
如果str
或者 substr
为null
,则返回null
。
参数:
str
:一个字符串或者Column
,为字符串列substr
:一个字符串locate(substr,str,pos=1)
:给出substr
在str
的首次出现的位置(在pos
之后)。位置不是从0开始,而是从1开始的。
如果substr
不在str
中,则返回 0 。
如果str
或者 substr
为null
,则返回null
。
参数:
str
:一个字符串或者Column
,为字符串列substr
:一个字符串pos
::起始位置(基于0开始)length(col)
:计算字符串或者字节的长度。
参数:
col
:一个字符串或者Column
,为字符串列,或者为字节列。levenshtein(left,right)
:计算两个字符串之间的Levenshtein
距离。
Levenshtein
距离:刻画两个字符串之间的差异度。它是从一个字符串修改到另一个字符串时,其中编辑单个字符串(修改、插入、删除)所需要的最少次数。
lower(col)
:转换字符串到小写
lpad(col,len,pad)
:对字符串,向左填充。
参数:
col
:一个字符串或者Column
,为字符串列len
:预期填充后的字符串长度pad
:填充字符串ltrim(col)
:裁剪字符串的左侧空格
md5(col)
:计算指定列的MD5
值(一个32字符的十六进制字符串)
regexp_extract(str,pattern,idx)
:通过正则表达式抽取字符串中指定的子串 。
参数:
str
:一个字符串或者Column
,为字符串列,表示被抽取的字符串。pattern
: 一个Java
正则表达式子串。idx
:表示抽取第几个匹配的结果。返回值:如果未匹配到,则返回空字符串。
.regexp_replace(str,pattern,replacement)
: 通过正则表达式替换字符串中指定的子串。
参数:
str
:一个字符串或者Column
,为字符串列,表示被替换的字符串。pattern
: 一个Java
正则表达式子串。replacement
:表示替换的子串返回值:如果未匹配到,则返回空字符串。
repeat(col,n)
:重复一个字符串列n
次,结果返回一个新的字符串列。
参数:
col
:一个字符串或者Column
,为字符串列n
:一个整数,表示重复次数reverse(col)
:翻转一个字符串列,结果返回一个新的字符串列
rpad(col,len,pad)
:向右填充字符串到指定长度。
参数:
col
:一个字符串或者Column
,为字符串列len
: 指定的长度pad
:填充字符串 rtrim(col)
:剔除字符串右侧的空格符
sha1(col)
: 以16进制字符串的形式返回SHA-1
的结果
sha2(col,numBites)
:以16进制字符串的形式返回SHA-2
的结果。
numBites
指定了结果的位数(可以为 244,256,384,512
,或者0
表示256
)
soundex(col)
:返回字符串的SoundEx
编码
split(str,pattern)
: 利用正则表达式拆分字符串。产生一个array
列
参数:
str
:一个字符串或者Column
,为字符串列pattern
:一个字符串,表示正则表达式substring(str,pos,len)
:抽取子串。
参数:
str
:一个字符串或者Column
,为字符串列,或者字节串列pos
:抽取的起始位置len
:抽取的子串长度返回值:如果str
表示字符串列,则返回的是子字符串。如果str
是字节串列,则返回的是字节子串。
substring_index(str,delim,count)
:抽取子串
参数:
str
: 一个字符串或者Column
,为字符串列delim
:一个字符串,表示分隔符count
:指定子串的下标。 如果为正数,则从左开始,遇到第count
个delim
时,返回其左侧的内容; 如果为负数,则从右开始,遇到第abs(count)
个delim
时,返回其右侧的内容;示例:
xxxxxxxxxx
df = spark.createDataFrame([('a.b.c.d',)], ['s'])
df.select(substring_index(df.s, '.', 2).alias('s')).collect()
# [Row(s=u'a.b')]
df.select(substring_index(df.s, '.', -3).alias('s')).collect()
# [Row(s=u'b.c.d')]
translate(srcCol,matching,replace)
:将srcCol
中指定的字符替换成另外的字符。
参数:
srcCol
: 一个字符串或者Column
,为字符串列matching
: 一个字符串。只要srcCol
中的字符串,有任何字符匹配了它,则执行替换replace
:它一一对应于matching
中要替换的字符示例:
xxxxxxxxxx
df = spark.createDataFrame([('translate',)], ['a'])
df.select(translate('a', "rnlt", "123") .alias('r')).collect()
# [Row(r=u'1a2s3ae')]
# r->1, n->2,l->3, t->空字符
trim(col)
:剔除字符串两侧的空格符
unbase64(col)
: 对字符串列执行BASE64
编码,并且返回一个二进制列
unhex(col)
:对字符串列执行hex
的逆运算。 给定一个十进制数字字符串,将其逆转换为十六进制数字字符串。
upper(col)
:将字符串列转换为大写格式
add_months(start, months)
:增加月份
参数:
start
:列名或者Column
表达式,指定起始时间months
:指定增加的月份示例:
xxxxxxxxxx
df = spark_session.createDataFrame([('2015-04-08',)], ['d'])
df.select(add_months(df.d, 1).alias('d'))
# 结果为:[Row(d=datetime.date(2015, 5, 8))]
current_data()
:返回当前日期作为一列
current_timestamp()
:返回当前的时间戳作为一列
date_add(start,days)
:增加天数
参数:
start
:列名或者Column
表达式,指定起始时间days
:指定增加的天数date_sub(start,days)
:减去天数
参数:
start
:列名或者Column
表达式,指定起始时间days
:指定减去的天数date_diff(end,start)
:返回两个日期之间的天数差值
参数:
end
:列名或者Column
表达式,指定结束时间。为date/timestamp/string
start
:列名或者Column
表达式,指定起始时间。为date/timestamp/string
date_format(date,format)
:转换date/timestamp/string
到指定格式的字符串。
参数:
date
:一个date/timestamp/string
列的列名或者Column
format
:一个字符串,指定了日期的格式化形式。支持java.text.SimpleDateFormat
的所有格式。dayofmonth(col)
:返回日期是当月的第几天(一个整数)。其中col
为date/timestamp/string
dayofyear(col)
:返回日期是当年的第几天(一个整数)。其中col
为date/timestamp/string
from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
:转换unix
时间戳到指定格式的字符串。
参数:
timestamp
:时间戳的列format
:时间格式化字符串from_utc_timestamp(timestamp, tz)
:转换unix
时间戳到指定时区的日期。
hour(col)
:从指定时间中抽取小时,返回一个整数列
参数:
col
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列minute(col)
:从指定时间中抽取分钟,返回一个整数列
参数:
col
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列second(col)
:从指定的日期中抽取秒,返回一个整数列。
参数:
col
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列month(col)
:从指定时间中抽取月份,返回一个整数列
参数:
col
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列quarter(col)
:从指定时间中抽取季度,返回一个整数列
参数:
col
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列last_day(date)
:返回指定日期的当月最后一天(一个datetime.date
)
参数:
date
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列months_between(date1,date2)
:返回date1
到 date2
之间的月份(一个浮点数)。
也就是date1-date2
的天数的月份数量。如果为正数,表明date1 > date2
。
参数:
date1
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列date2
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列next_day(date,dayOfWeek)
:返回指定天数之后的、且匹配dayOfWeek
的那一天。
参数:
date1
:一个字符串或者Column
。是表示时间的字符串列,或者datetime
列dayOfWeek
:指定星期几。是大小写敏感的,可以为:'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'
to_date(col,format=None)
:转换pyspark.sql.types.StringType
或者pyspark.sql.types.TimestampType
到 pyspark.pysql.types.DateType
参数:
col
:一个字符串或者Column
。是表示时间的字符串列format
:指定的格式。默认为yyyy-MM-dd
to_timestamp(col,format=None)
:将StringType,TimestampType
转换为DataType
。
参数:
col
:一个字符串或者Column
。是表示时间的字符串列format
:指定的格式。默认为yyyy-MM-dd HH:mm:ss
to_utc_timestamp(timestamp,tz)
:根据给定的时区,将StringType,TimestampType
转换为DataType
。
参数:
col
:一个字符串或者Column
。是表示时间的字符串列tz
:一个字符串,表示时区trunc(date,format)
:裁剪日期到指定的格式 。
参数:
date
:一个字符串或者Column
。是表示时间的字符串列format
:指定的格式。如: 'year','YYYY','yy','month','mon','mm','d'
unix_timestamp(timestamp=None,format='yyyy-MM-dd HH:mm:ss')
:给定一个unix timestamp
(单位为秒),将其转换为指定格式的字符串。使用默认的时区和默认的locale
。
如果转换失败,返回null
。
如果timestamp=None
,则返回当前的timestamp
。
参数:
timestamp
:一个unix
时间戳列。format
:指定转换的格式 weekofyear(col)
: 返回给定时间是当年的第几周。返回一个整数。
year(col)
:从日期中抽取年份,返回一个整数。
count(col)
:计算每一组的元素的个数。
avg(col)
:计算指定列的均值
approx_count_distinct(col, rsd=None)
:统计指定列有多少个distinct
值
countDistinct(col,*cols)
:计算一列或者一组列中的distinct value
的数量。
collect_list(col)
:返回指定列的元素组成的列表(不会去重)
collect_set(col)
:返回指定列的元素组成的集合(去重)
first(col,ignorenulls=False)
:返回组内的第一个元素。
如果ignorenulls=True
,则忽略null
值,直到第一个非null
值。如果都是null
,则返回null
。
如果ignorenulls=False
,则返回组内第一个元素(不管是不是null
)
last(col,ignorenulls=False)
:返回组内的最后一个元素。
如果ignorenulls=True
,则忽略null
值,直到最后一个非null
值。如果都是null
,则返回null
。
如果ignorenulls=False
,则返回组内最后一个元素(不管是不是null
)
grouping(col)
:判断group by list
中的指定列是否被聚合。如果被聚合则返回1,否则返回 0。
grouping_id(*cols)
:返回grouping
的级别。
cols
必须严格匹配grouping columns
,或者为空(表示所有的grouping columns
)
kurtosis(col)
:返回一组元素的峰度
max(col)
:返回组内的最大值。
mean(col)
:返回组内的均值
min(col)
:返回组内的最小值
skewness(col)
: 返回组内的偏度
stddev(col)
:返回组内的样本标准差(分母除以 N-1
)
stddev_pop(col)
:返回组内的总体标准差(分母除以 N
)
stddev_samp(col)
: 返回组内的标准差,与stddev
相同
sum(col)
:返回组内的和
sumDistinct(col)
:返回组内distinct
值的和
var_pop(col)
:返回组内的总体方差。 (分母除以 N
)
var_samp(col)
:返回组内的样本方差 。(分母除以 N-1
)
variance(col)
:返回组内的总体方差,与var_pop
相同
.bitwiseNot(col)
:返回一个字符串列,它是旧列的比特级的取反。isnan(col)
:返回指定的列是否是NaN
isnull(col)
:返回指定的列是否为null
shiftLeft(col,numBites)
:按位左移指定的比特位数。shiftRight(col,numBites)
:按位右移指定的比特位数。shiftRightUnsigned(col,numBites)
:按位右移指定的比特位数。但是无符号移动。 asc(col)
:返回一个升序排列的Column
desc(col)
:返回一个降序排列的Column
col(col)
:返回值指定列组成的Column
column(col)
:返回值指定列组成的Column
window(timeColumn,windowDuration,slideDuration=None,startTime=None)
:将rows
划分到一个或者多个窗口中(通过timestamp
列)
参数:
timeColumn
:一个时间列,用于划分window
。它必须是pyspark.sql.types.TimestampType
windowDuration
: 表示时间窗口间隔的字符串。如 '1 second','1 day 12 hours','2 minutes'
。单位字符串可以为'week','day','hour','minute','second','millisecond','microsecond'
。slideDuration
: 表示窗口滑动的间隔,即:下一个窗口移动多少。如果未提供,则窗口为 tumbling windows
。 单位字符串可以为'week','day','hour','minute','second','millisecond','microsecond'
。startTime
:起始时间。它是1970-01-01 00:00:00
以来的相对偏移时刻。如,你需要在每个小时的15
分钟开启小时窗口,则它为15 minutes
: 12:15-13:15,13:15-14:15,...
返回值:返回一个称作window
的 struct
,它包含start,end
(一个半开半闭区间)
cume_dist()
:返回一个窗口中的累计分布概率。
dense_rank()
:返回窗口内的排名。(1,2,...
表示排名为1,2,...
)
它和rank()
的区别在于:dense_rank()
的排名没有跳跃(比如有3个排名为1,那么下一个排名是2,而不是下一个排名为4)
rank()
:返回窗口内的排名。(1,2,...
表示排名为1,2,...
)。
如有3个排名为1,则下一个排名是 4。
percent_rank()
:返回窗口的相对排名(如:百分比)
lag(col,count=1,default=None)
:返回当前行之前偏移行的值。如果当前行之前的行数小于count
,则返回default
值。
参数:
col
:一个字符串或者Column
。开窗的列count
:偏移行default
:默认值lead(col,count=1,default=None)
:返回当前行之后偏移行的值。如果当前行之后的行数小于count
,则返回default
值。
参数:
col
:一个字符串或者Column
。开窗的列count
:偏移行default
:默认值ntile(n)
:返回有序窗口分区中的ntile group id
(从 1 到 n
)
row_number()
: 返回一个序列,从 1 开始,到窗口的长度。
array(*cols)
:创新一个新的array
列。
参数:
cols
:列名字符串列表,或者Column
列表。要求这些列具有同样的数据类型示例:
xxxxxxxxxx
df.select(array('age', 'age').alias("arr"))
df.select(array([df.age, df.age]).alias("arr"))
array_contains(col, value)
:创建一个新列,指示value
是否在array
中(由col
给定)
其中col
必须是array
类型。而value
是一个值,或者一个Column
或者列名。
判断逻辑:
array
为null
,则返回null
;value
位于 array
中,则返回True
;value
不在 array
中,则返回False
示例:
xxxxxxxxxx
df = spark_session.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.select(array_contains(df.data, "a"))
create_map(*cols)
:创建一个map
列。
参数:
cols
:列名字符串列表,或者Column
列表。这些列组成了键值对。如(key1,value1,key2,value2,...)
示例:
xxxxxxxxxx
df.select(create_map('name', 'age').alias("map")).collect()
#[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
broadcast(df)
:标记df
这个Dataframe
足够小,从而应用于broadcast join
参数:
df
:一个 Dataframe
对象coalesce(*cols)
:返回第一个非null
的列组成的Column
。如果都为null
,则返回null
参数:
cols
:列名字符串列表,或者Column
列表。crc32(col)
:计算二进制列的CRC32
校验值。要求col
是二进制列。
explode(col)
:将一个array
或者 map
列拆成多行。要求col
是一个array
或者map
列。
示例:
xxxxxxxxxx
eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
# 结果为:[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
#结果为:
# +---+-----+
# |key|value|
# +---+-----+
# | a| b|
# +---+-----+
posexplode(col)
: 对指定array
或者map
中的每个元素,依据每个位置返回新的一行。
要求col
是一个array
或者map
列。
示例:
xxxxxxxxxx
eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(posexplode(eDF.intlist)).collect()
#结果为:[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
expr(str)
:计算表达式。
参数:
str
:一个表达式。如length(name)
from_json(col,schema,options={})
:解析一个包含JSON
字符串的列。如果遇到无法解析的字符串,则返回null
。
参数:
col
:一个字符串列,字符串是json
格式schema
:一个StructType
(表示解析一个元素),或者StructType
的 ArrayType
(表示解析一组元素)options
:用于控制解析过程。示例:
xxxxxxxxxx
from pyspark.sql.types import *
schema = StructType([StructField("a", IntegerType())])
df = spark_session.createDataFrame([(1, '{"a": 1}')], ("key", "value"))
df.select(from_json(df.value, schema).alias("json")).collect()
#结果为:[Row(json=Row(a=1))]
get_json_object(col,path)
:从json
字符串中提取指定的字段。如果json
字符串无效,则返回null
.
参数:
col
:包含json
格式的字符串的列。path
:json
的字段的路径。示例:
xxxxxxxxxx
data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
df = spark_session.createDataFrame(data, ("key", "jstring"))
df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"),
get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
# 结果为:[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
greatest(*cols)
:返回指定的一堆列中的最大值。要求至少包含2列。
它会跳过null
值。如果都是null
值,则返回null
。
least(*cols)
:返回指定的一堆列中的最小值。要求至少包含2列。
它会跳过null
值。如果都是null
值,则返回null
。
json_tuple(col,*fields)
:从json
列中抽取字段组成新列(抽取n
个字段,则生成n
列)
参数:
col
:一个json
字符串列fields
:一组字符串,给出了json
中待抽取的字段lit(col)
:创建一个字面量值的列
monotonically_increasing_id()
:创建一个单调递增的id
列(64位整数)。
它可以确保结果是单调递增的,并且是unique
的,但是不保证是连续的。
它隐含两个假设:
dataframe
分区数量少于1 billion
8 billion
nanvl(col1,col2)
:如果col1
不是NaN
,则返回col1
;否则返回col2
。
要求col1
和 col2
都是浮点列(DoubleType
或者 FloatType
)
size(col)
:计算array/map
列的长度(元素个数)。
sort_array(col,asc=True)
: 对array
列中的array
进行排序(排序的方式是自然的顺序)
参数:
col
:一个字符串或者Column
, 指定一个array
列asc
: 如果为True
,则是升序;否则是降序spark_partition_id()
:返回一个partition ID
列
该方法产生的结果依赖于数据划分和任务调度,因此是未确定结果的。
struct(*cols)
:创建一个新的struct
列。
参数:
cols
:一个字符串列表(指定了列名),或者一个Column
列表示例:
xxxxxxxxxx
df.select(struct('age', 'name').alias("struct")).collect()
# [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
to_json(col,options={})
:将包含 StructType
或者Arrytype
的StructType
转换为json
字符串。如果遇到不支持的类型,则抛出异常。
参数:
col
:一个字符串或者Column
,表示待转换的列 options
:转换选项。它支持和json datasource
同样的选项udf(f=None,returnType=StringType)
:根据用户定义函数(UDF
) 来创建一列。
参数:
f
:一个python
函数,它接受一个参数returnType
:一个pyspqrk.sql.types.DataType
类型,表示udf
的返回类型示例:
xxxxxxxxxx
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
df.select(slen("name").alias("slen_name"))
when(condition,value)
: 对一系列条件求值,返回其中匹配的哪个结果。
如果Column.otherwise()
未被调用,则当未匹配时,返回None
;如果Column.otherwise()
被调用,则当未匹配时,返回otherwise()
的结果。
参数:
condition
:一个布尔列 value
:一个字面量值,或者一个Column
示例:
xxxxxxxxxx
df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
# [Row(age=3), Row(age=4)]