RDD
(弹性分布式数据集Resilient Distributed Dataset
):Spark
中数据的核心抽象。
RDD
是不可变的分布式对象集合RDD
都被分为多个分区,这些分区运行在集群中的不同节点上RDD
可以包含Python、Java、Scala
中任意类型的对象在spark
中,RDD
相关的函数分为三类:
RDD
transformation
) 已有的RDD
action
) 来对RDD
求值在这些背后,spark
自动将RDD
中的数据分发到集群上,并将action
并行化执行。
RDD
支持两类操作:
transformation
) : 它会从一个RDD
生成一个新的RDD
action
) :它会对RDD
计算出一个结果,并将结果返回到driver
程序中(或者把结果存储到外部存储系统,如HDFS
中)如果你不知道一个函数时转换操作还是行动操作,你可以考察它的返回值:
RDD
,则是转换操作。如果返回的是其它数据类型,则是行动操作转换操作和行动操作的区别在于:行动操作会触发实际的计算。
你可以在任意时候定义新的RDD
,但是Spark
只会惰性计算这些RDD
:只有第一次在一个行动操作中用到时,才会真正计算
RDD
的操作都是惰性的(包括读取数据的sc.textFile()
函数)在计算RDD
时,它所有依赖的中间RDD
也会被求值
spark
只计算求值过程中需要的那些数据。默认情况下,spark
的RDD
会在你每次对它进行行动操作时重新计算。
如果希望在多个行动操作中重用同一个RDD
,则可以使用RDD.persist()
让spark
把这个RDD
缓存起来
在第一次对持久化的RDD
计算之后,Spark
会把RDD
的内容保存到内存中(以分区的方式存储到集群的各个机器上)。然后在此后的行动操作中,可以重用这些数据
lines = sc.textFile("xxx.md")
lines.persist()
lines.count() #计算 lines,此时将 lines 缓存
lines.first() # 使用缓存的 lines
之所以默认不缓存RDD
的计算结果,是因为:spark
可以直接遍历一遍数据然后计算出结果,没必要浪费存储空间。
每个Spark
程序或者shell
会话都按照如下流程工作:
RDD
filter()
这样的转换操作对RDD
进行转换,以定义新的RDD
RDD
执行persist()
操作count()
等) 触发一次并行计算,spark
会对计算进行优化之后再执行spark
中的大部分转化操作和一部分行动操作需要用户传入一个可调用对象。在python
中,有三种方式:lambda
表达式、全局定义的函数、局部定义的函数
注意:python
可能会把函数所在的对象也序列化之后向外传递。
当传递的函数是某个对象的成员,或者包含了某个对象中一个字段的引用(如self.xxx
时),spark
会把整个对象发送到工作节点上。
python
不知道如何序列化该对象,则程序运行失败xxxxxxxxxx
class XXX:
def is_match(self,s):
return xxx
def get_xxx(self,rdd):
return rdd.filter(self.is_match) # bad! 传递的函数 self.is_match 是对象的成员
def get_yyy(self,rdd):
return rdd.filter(lambda x:self._x in x) #bad! 传递的函数包含了对象的成员 self._x
解决方案是:将你需要的字段从对象中取出,放到一个局部变量中:
xxxxxxxxxx
class XXX:
def is_match(self,s):
return xxx
def get_xxx(self,rdd):
_is_match = self.is_match
return rdd.filter(_is_match) # OK
def get_yyy(self,rdd):
_x = self._x
return rdd.filter(lambda x:_x in x) #OK
在python
中,如果操作对应的RDD
数据类型不正确,则导致运行报错。
用户可以通过两种方式创建RDD
:
读取一个外部数据集。
xxxxxxxxxx
lines = sc.textFile("xxx.md")
在driver
程序中分发driver
程序中的对象集合(如list
或者set
)
xxxxxxxxxx
lines = sc.parallelize([1,3,55,1])
driver
程序所在的机器的内存中。键值对RDD
的元素通常是一个二元元组(而不是单个值)
RDD
也被称作Pair RDD
RDD
常常用于聚合计算spark
为键值对RDD
提供了并行操作各个键、跨节点重新进行数据分组的接口Pair RDD
的创建:
通过对常规RDD
执行转化来创建Pair RDD
RDD
中抽取某些字段,将该字段作为Pair RDD
的键对于很多存储键值对的数据格式,当读取该数据时,直接返回由其键值对数据组成的Pair RDD
当数据集已经在内存时,如果数据集由二元元组组成,那么直接调用sc.parallelize()
方法就可以创建Pair RDD
转换操作(transformation
) 会从一个RDD
生成一个新的RDD
action
操作中RDD
(RDD
是不可变的),而是创建并返回一个新的RDD
spark
会使用谱系图来记录各个RDD
之间的依赖关系
RDD
行动操作中,需要这个依赖关系来按需计算每个中间RDD
RDD
丢失部分数据时,也需要这个依赖关系来恢复丢失的数据基本转换操作:
.map(f, preservesPartitioning=False)
:将函数 f
作用于当前RDD
的每个元素,返回值构成新的RDD
。
preservesPartitioning
:如果为True
,则新的RDD
保留旧RDD
的分区.flatMap(f, preservesPartitioning=False)
:将函数 f
作用于当前RDD
的每个元素,将返回的迭代器的内容构成了新的RDD
。
flatMap
可以视作:将返回的迭代器扁平化xxxxxxxxxx
lines = sc.parallelize(['hello world','hi'])
lines.map(lambda line:line.split(" ")) #新的RDD元素为[['hello','world'],['hi',]]
lines.flatMap(lambda line:line.split(" ")) #新的RDD元素为 ['hello','word','hi']
.mapPartitions(f, preservesPartitioning=False)
:将函数 f
作用于当前RDD
的每个分区,将返回的迭代器的内容构成了新的RDD
。
这里
f
函数的参数是一个集合(表示一个分区的数据)
.mapPartitionsWithIndex(f, preservesPartitioning=False)
:将函数 f
作用于当前RDD
的每个分区以及分区id
,将返回的迭代器的内容构成了新的RDD
。
f
函数的参数是f分区id
以及一个集合(表示一个分区的数据)示例:
xxxxxxxxxx
def f(splitIndex, iterator):
xxx
rdd.mapPartitionsWithIndex(f)
.filter(f)
:将函数f
(称作过滤器) 作用于当前RDD
的每个元素,通过f
的那些元素构成新的RDD
.distinct(numPartitions=None)
:返回一个由当前RDD
元素去重之后的结果组成新的RDD
。
numPartitions
:指定了新的RDD
的分区数sample(withReplacement, fraction, seed=None)
:对当前RDD
进行采样,采样结果组成新的RDD
withReplacement
:如果为True
,则可以重复采样;否则是无放回采样
fractions
:新的RDD
的期望大小(占旧RDD
的比例)。spark
并不保证结果刚好满足这个比例(只是一个期望值)
withReplacement=True
:则表示每个元素期望被选择的次数withReplacement=False
:则表示每个元素期望被选择的概率seed
:随机数生成器的种子
.sortBy(keyfunc, ascending=True, numPartitions=None)
:对当前RDD
进行排序,排序结果组成新的RDD
keyfunc
:自定义的比较函数ascending
:如果为True
,则升序排列.glom()
:返回一个RDD
,它将旧RDD
每个分区的元素聚合成一个列表,作为新RDD
的元素
.groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:返回一个分组的RDD
示例:
xxxxxxxxxx
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
#结果为: [(0, [2, 8]), (1, [1, 1, 3, 5])]
针对两个RDD
的转换操作:
尽管RDD
不是集合,但是它也支持数学上的集合操作。注意:这些操作都要求被操作的RDD
是相同数据类型的。
.union(other)
:合并两个RDD
中所有元素,生成一个新的RDD
。
other
:另一个RDD
该操作并不会检查两个输入RDD
的重复元素,只是简单的将二者合并(并不会去重)。
.intersection(other)
:取两个RDD
元素的交集,生成一个新的RDD
。
该操作会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。
.subtract(other, numPartitions=None)
:存在于第一个RDD
而不存在于第二个RDD
中的所有元素组成的新的RDD
。
该操作也会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。
.cartesian(other)
:两个RDD
的笛卡尔积,生成一个新的RDD
。
新RDD
中的元素是元组 (a,b)
,其中 a
来自于第一个RDD
,b
来自于第二个RDD
RDD
的笛卡尔积开销巨大.keyBy(f)
:创建一个RDD
,它的元素是元组(f(x),x)
。
示例:
xxxxxxxxxx
sc.parallelize(range(2,5)).keyBy(lambda x: x*x)
# 结果为:[(4, 2), (9, 3), (16, 4)]
.pipe(command, env=None, checkCode=False)
:返回一个RDD
,它由外部进程的输出结果组成。
参数:
command
:外部进程命令env
:环境变量checkCode
:如果为True
,则校验进程的返回值.randomSplit(weights, seed=None)
:返回一组新的RDD
,它是旧RDD
的随机拆分
参数:
weights
:一个double
的列表。它给出了每个结果DataFrame
的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0seed
:随机数种子.zip(other)
:返回一个Pair RDD
,其中键来自于self
,值来自于other
RDD
拥有同样数量的分区,且每个分区拥有同样数量的元素.zipWithIndex()
:返回一个Pair RDD
,其中键来自于self
,值就是键的索引。
.zipWithUniqueId()
:返回一个Pair RDD
,其中键来自于self
,值是一个独一无二的id
。
它不会触发一个spark job
,这是它与zipWithIndex
的重要区别。
Pair RDD
可以使用所有标准RDD
上的可用的转换操作
Pair RDD
的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。基本转换操作:
.keys()
:返回一个新的RDD
,包含了旧RDD
每个元素的键
.values()
:返回一个新的RDD
,包含了旧RDD
每个元素的值
.mapValues(f)
:返回一个新的RDD
,元素为 [K,f(V)]
(保留原来的键不变,通过f
改变值)。
.flatMapValues(f)
:返回一个新的RDD
,元素为 [K,f(V)]
(保留原来的键不变,通过f
改变值)。它与.mapValues(f)
区别见下面的示例:
xxxxxxxxxx
x=sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
x1=x.flatMapValues(lambda t:t).collect()
# x1: [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
x2=x.mapValues(lambda t:t).collect()
# x2: [("a", ["x", "y", "z"]), ("b", ["p", "r"])]
.sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7f51f1ab5050>)
:对当前Pair RDD
进行排序,排序结果组成新的RDD
keyfunc
:自定义的比较函数ascending
:如果为True
,则升序排列.sampleByKey(withReplacement, fractions, seed=None)
:基于键的采样(即:分层采样)
参数:
withReplacement
:如果为True
,则是有放回的采样;否则是无放回的采样fractions
:一个字典,指定了键上的每个取值的采样比例(不同取值之间的采样比例无关,不需要加起来为1)seed
:随机数种子.subtractByKey(other, numPartitions=None)
:基于键的差集。返回一个新的RDD
,其中每个(key,value)
都位于self
中,且不在other
中
基于键的聚合操作:
在常规RDD
上,fold()、aggregate()、reduce()
等都是行动操作。在Pair RDD
上,有类似的一组操作,用于针对相同键的元素进行聚合。这些操作返回RDD
,因此是转化操作而不是行动操作。
返回的新RDD
的键为原来的键,值为针对键的元素聚合的结果。
.reduceByKey(f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:合并具有相同键的元素。f
作用于同一个键的那些元素的值。
RDD
:由键,以及对应的规约结果组成.foldByKey(zeroValue,f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:通过f
聚合具有相同键的元素。其中zeroValue
为零值。参见.fold()
.aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:通过f
聚合具有相同键的元素。其中zeroValue
为零值。参见.aggregate()
.combineByKey(createCombiner,mergeValue,mergeCombiners, numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:它是最为常用的基于键的聚合函数,大多数基于键的聚合函数都是用它实现的。
和aggregate()
一样,combineByKey()
可以让用户返回与输入数据类型不同的返回值。
你需要提供三个函数:
createCombiner(v)
:v
表示键对应的值。返回一个C
类型的值(表示累加器)mergeValue(c,v)
:c
表示当前累加器,v
表示键对应的值。返回一个C
类型的值(表示更新后的累加器)mergeCombiners(c1,c2)
:c1
表示某个分区某个键的累加器,c2
表示同一个键另一个分区的累加器。返回一个C
类型的值(表示合并后的累加器)其工作流程是:遍历分区中的所有元素。考察该元素的键:
如果键从未在该分区中出现过,表明这是分区中的一个新的键。则使用createCombiner()
函数来创建该键对应的累加器的初始值。
注意:这一过程发生在每个分区中,第一次出现各个键的时候发生。而不仅仅是整个RDD
中第一次出现一个键时发生。
如果键已经在该分区中出现过,则使用mergeValue()
函数将该键的累加器对应的当前值与这个新的值合并
由于每个分区是独立处理的,因此同一个键可以有多个累加器。如果有两个或者更多的分区都有同一个键的累加器,则使用mergeCombiners()
函数将各个分区的结果合并。
数据分组:
.groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:根据键来进行分组。
RDD
,类型为[K,Iterable[V]]
,其中K
为原来RDD
的键的类型,V
为原来RDD
的值的类型。reduceByKey、aggregateByKey
性能更好。.cogroup(other,numPartitions=None)
:它基于self
和 other
两个TDD
中的所有的键来进行分组,它提供了为多个RDD
进行数据分组的方法。
RDD
,类型为[K,(Iterable[V],Iterable[W])]
。其中K
为两个输入RDD
的键的类型,V
为原来self
的值的类型,W
为other
的值的类型。RDD
中,另一个输入RDD
中不存在,则对应的迭代器为空。groupWith
的别名,但是groupWith
支持更多的TDD
来分组。数据连接:
数据连接操作的输出RDD
会包含来自两个输入RDD
的每一组相对应的记录。输出RDD
的类型为[K,(V,W)]
,其中K
为两个输入RDD
的键的类型,V
为原来self
的值的类型,W
为other
的值的类型。
.join(other,numPartitions=None)
:返回一个新的RDD
,它是两个输入RDD
的内连接。.leftOuterJoin(other,numPartitions=None)
:返回一个新的RDD
,它是两个输入RDD
的左外连接。.rightOuterJoin(other,numPartitions=None)
:返回一个新的RDD
,它是两个输入RDD
的右外连接。.fullOuterJoin(other, numPartitions=None)
:执行right outer join
行动操作(action
) 会对RDD
计算出一个结果,并将结果返回到driver
程序中(或者把结果存储到外部存储系统,如HDFS
中)
RDD
的求值每当调用一个新的行动操作时,整个RDD
都会从头开始计算
RDD
持久化在调用sc.textFile()
时,数据并没有读取进来,而是在必要的时候读取。
RDD
缓存,则该读取操作可能被多次执行spark
采取惰性求值的原因:通过惰性求值,可以把一些操作合并起来从而简化计算过程。
.reduce(f)
:通过 f
来聚合当前RDD
。
f
操作两个相同元素类型的RDD
数据,并且返回一个同样类型的新元素。RDD
中的元素类型相同).fold(zeroValue,op)
:通过 op
聚合当前RDD
该操作首先对每个分区中的元素进行聚合(聚合的第一个数由zeroValue
提供)。然后将分区聚合结果与zeroValue
再进行聚合。
f
操作两个相同元素类型的RDD
数据,并且返回一个同样类型的新元素。RDD
中的元素类型相同)
zeroValue
参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。
.aggregate(zeroValue,seqOp,combOp)
:该操作也是聚合当前RDD
。聚合的步骤为:
首先以分区为单位,对当前RDD
执行seqOp
来进行聚合。聚合的结果不一定与当前TDD
元素相同类型。
然后以zeroValue
为初始值,将分区聚合结果按照combOp
来聚合(聚合的第一个数由zeroValue
提供),得到最终的聚合结果。
zeroValue
:combOp
聚合函数的初始值。类型与最终结果类型相同seqOp
:分区内的聚合函数,返回类型与zeroValue
相同combOp
:分区之间的聚合函数。
zeroValue
参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。
示例:取均值:
xxxxxxxxxx
sum_count = nums.aggregate((0,0),
(lambda acc,value:(acc[0]+value,acc[1]+1),
(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
)
return sum_count[0]/float(sum_count[1])
获取RDD
中的全部或者部分元素:
.collect()
:它将整个RDD
的内容以列表的形式返回到driver
程序中。
RDD
内容不大时使用,要求所有结果都能存入单台机器的内存中.take(n)
:以列表的形式返回RDD
中的n
个元素到driver
程序中。
.takeOrderd(n,key=None)
:以列表的形式按照指定的顺序返回RDD
中的n
个元素到driver
程序中。
key
参数来指定比较函数.takeSample(withReplacement,num,seed=None)
:以列表的形式返回对RDD
随机采样的结果。
withReplacement
:如果为True
,则可以重复采样;否则是无放回采样num
:预期采样结果的数量。如果是重复采样,则最终采样结果就是num
。如果是无放回采样,则最终采样结果不能超过RDD
的大小。seed
:随机数生成器的种子top(n,key=None)
:获取RDD
的前n
个元素。
top n
。你也可以提供key
参数来指定比较函数。.first()
:获取RDD
中的第一个元素。
计数:
.count()
:返回RDD
的元素总数量(不考虑去重)
.countByValue()
:以字典的形式返回RDD
中,各元素出现的次数。
.histogram(buckets)
:计算分桶
参数:
buckets
:指定如何分桶。
[1,10,20,50]
代表了区间[1,10) [10,20) [20,50]
(最后一个桶是闭区间)。该序列必须是排序好的,且不能包含重复数字,且至少包含两个数字。min~max
之间的、均匀分布的桶中。它必须大于等于1.返回值:一个元组 (桶区间序列,桶内元素个数序列)
示例:
xxxxxxxxxx
rdd = sc.parallelize(range(51))
rdd.histogram(2)
# 结果为 ([0, 25, 50], [25, 26])
rdd.histogram([0, 5, 25, 50])
#结果为 ([0, 5, 25, 50], [5, 20, 26])
.foreach(f)
:对当前RDD
的每个元素执行函数 f
。
.map(f)
不同。.map
是转换操作,而.foreach
是行动操作。.foreach
用于将RDD
的数据以json
格式发送到网络服务器上,或者写入到数据库中。.foreachPartition(f)
:对当前RDD
的每个分区应用f
示例:
xxxxxxxxxx
def f1(x):
print(x)
def f2(iterator):
for x in iterator:
print(x)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(f1)
rdd.foreachPartition(f2)
统计方法:
.max(key=None)
:返回RDD
的最大值。
参数:
key
:对RDD
中的值进行映射,比较的是key(x)
之后的结果(但是返回的是x
而不是映射之后的结果).mean()
:返回RDD
的均值
.min(key=None)
:返回RDD
的最小值。
参数:
key
:对RDD
中的值进行映射,比较的是key(x)
之后的结果(但是返回的是x
而不是映射之后的结果).sampleStdev()
:计算样本标准差
.sampleVariance()
:计算样本方差
.stdev()
:计算标准差。它与样本标准差的区别在于:分母不同
.variance()
:计算方差。它与样本方差的区别在于:分母不同
.sum()
:计算总和
Pair RDD
可以使用所有标准RDD
上的可用的行动操作
Pair RDD
的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。.countByKey()
:以字典的形式返回每个键的元素数量。
.collectAsMap()
:以字典的形式返回所有的键值对。
.lookup(key)
:以列表的形式返回指定键的所有的值。
属性:
.context
:返回创建该RDD
的 SparkContext
.id()
:返回RDD
的 ID
.isEmpty()
:当且仅当RDD
为空时,它返回True
;否则返回False
.name()
:返回RDD
的名字
.setName(name)
:设置RDD
的名字
.stats()
:返回一个StatCounter
对象,用于计算RDD
的统计值
.toDebugString()
:返回一个RDD
的描述字符串,用于调试
.toLocalIterator()
:返回一个迭代器,对它迭代的结果就是对RDD
的遍历。
如果简单的对RDD
调用行动操作,则Spark
每次都会重新计算RDD
以及它所有的依赖RDD
。
当我们让spark
持久化存储一个RDD
时,计算出RDD
的节点会分别保存它们所求出的分区数据。
spark
会在需要用到该缓存数据时,重新计算丢失的分区数据。我们可以为RDD
选择不同的持久化级别:在pyspark.StorageLevel
中:
MEMORY_ONLY
:数据缓存在内存中。
CPU
时间:低;是否在内存:是;是否在磁盘中:否。MEMORY_ONLY_SER
:数据经过序列化之后缓存在内存中。
CPU
时间:高;是否在内存:是;是否在磁盘中:否。MEMORY_AND_DISK
:数据缓存在内存和硬盘中。
CPU
时间:中等;是否在内存:部分;是否在磁盘中:部分。MEMORY_AND_DISK_SER
:数据经过序列化之后缓存在内存和硬盘中。
CPU
时间:高;是否在内存:部分;是否在磁盘中:部分。DISK_ONLY
:数据缓存在磁盘中 。
CPU
时间:高;是否在内存:否;是否在磁盘中:是。如果在存储级别末尾加上数字 N
,则表示将持久化数据存储为 N
份。如:
xxxxxxxxxx
MEMORY_ONLY_2 #表示对持久化数据存储为 2 份
在
python
中,总是使用pickle library
来序列化对象,因此在python
中可用的存储级别有:
MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2
.persist(storageLevel=StorageLevel(False, True, False, False, 1))
:对当前RDD
进行持久化
RDD
做个持久化标记。一旦该RDD
第一次求值时,才会发生持久化。.persist()
的默认行为是:将数据以序列化的形式缓存在JVM
的堆空间中.cache()
:它也是一种持久化调用。
.persist(MEMORY_ONLY)
.unpersist()
:标记当前RDD
是未缓存的,并且将所有该RDD
已经缓存的数据从内存、硬盘中清除。
当要缓存的数据太多,内存放不下时,spark
会利用最近最少使用(LRU
) 的缓存策略,把最老的分区从内存中移除。
MEMORY_ONLY、MEMORY_ONLY_SER
级别:下一次要用到已经被移除的分区时,这些分区就要重新计算MEMORY_AND_DISK、MEMORY_AND_DISK_SER
级别:被移除的分区都会被写入磁盘。.getStorageLevel()
:返回当前的缓存级别
如果使用可控的分区方式,将经常被一起访问的数据放在同一个节点上,那么可以大大减少应用的通信开销。
分区并不是对所有应用都是有好处的:如果给定的RDD
只需要被扫描一次,则我们完全没有必要对其预先进行分区处理。
Spark
中所有的键值对RDD
都可以进行分区。系统会根据一个针对键的函数对元素进行分组。
spark
可以确保同一个组的键出现在同一个节点上许多spark
操作会自动为结果RDD
设定分区
sortByKey()
会自动生成范围分区的RDD
groupByKey()
会自动生成哈希分区的RDD
其它还有join()、leftOuterJoin()、rightOuterJoin()、cogroup()、groupWith()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()
,以及mapValues()
(如果输入RDD
有分区方式)、flatMapValues()
(如果输入RDD
有分区方式)
对于map()
操作,由于理论上它可能改变元素的键,因此其结果不会有固定的分区方式。
对于二元操作,输出数据的分区方式取决于输入RDD
的分区方式
RDD
已经设置过分区方式,则结果就使用该分区方式RDD
都设置过分区方式,则使用第一个输入的分区方式许多spark
操作会利用已有的分区信息,如join()、leftOuterJoin()、rightOuterJoin()、cogroup()、groupWith()、groupByKey()、reduceByKey()、combineByKey()、lookup()
。 这些操作都能从分区中获得收益。
.getNumPartitions
属性可以查看RDD
的分区数在执行聚合或者分组操作时,可以要求Spark
使用指定的分区数(即numPartitions
参数)
spark
根据集群的大小会自动推断出一个有意义的默认值如果我们希望在除了聚合/分组操作之外,也能改变RDD
的分区。那么Spark
提供了.repartition()
方法
.coalesce()
方法将RDD
的分区数减少。它是一个代价相对较小的操作。.repartition(numPartitions)
:返回一个拥有指定分区数量的新的RDD
.coalesce(numPartitions,shuffle=False)
:返回一个拥有指定分区数量的新的RDD
.partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
:返回一个使用指定分区器和分区数量的新的RDD
partitionFunc
是分区函数。注意:如果你想让多个RDD
使用同一个分区方式,则应该使用同一个分区函数对象(如全局函数),而不要给每个RDD
创建一个新的函数对象。对于重新调整分区的操作结果,建议对其持久化。
RDD
时,都会重复地对数据进行分区操作,性能太差spark
中的某些操作会触发shuffle
shuffle
是spark
重新分配数据的一种机制,它使得这些数据可以跨不同区域进行分组
executor
和驱动器程序之间拷贝数据,使得shuffle
成为一个复杂的、代价高昂的操作在spark
里,特定的操作需要数据不会跨分区分布。如果跨分区分别,则需要混洗。
以reduceByKey
操作的过程为例。一个面临的挑战是:一个key
的所有值不一定在同一个分区里,甚至不一定在同一台机器里。但是它们必须共同被计算。
为了所有数据都在单个reduceByKey
的 reduce
任务上运行,我们需要执行一个all-to-all
操作:它必须从所有分区读取所有的key
和key
对应的所有的值,并且跨分区聚集取计算每个key
的结果。 这个过程叫做shuffle
。
触发混洗的操作包括:
repartition、coalesce
等等ByKey
操作(除了countint
之外),如groupByKey、reduceByKey
等等join
操作,如cogroup、join
等等混洗是一个代价比较高的操作,它涉及到磁盘IO
,数据序列化,网络IO
为了准备混洗操作的数据,spark
启动了一系列的任务:map
任务组织数据,reduce
完成数据的聚合。
这些术语来自于
MapReduce
,与spark
的map,reduce
操作没有关系
map
任务的所有结果数据会保存在内存,直到内存不能完全存储为止。然后这些数据将基于目标分区进行排序,并写入到一个单独的文件中reduce
任务将读取相关的已排序的数据块某些混洗操作会大量消耗堆内存空间,因为混洗操作在数据转换前后,需要使用内存中的数据结构对数据进行组织
混洗操作还会在磁盘上生成大量的中间文件。
spark
需要重新计算RDD
的血统关系时,混洗操作产生的这些中间文件不需要重新创建