RDD

一、概述

  1. RDD(弹性分布式数据集Resilient Distributed Dataset):Spark 中数据的核心抽象。

    • RDD 是不可变的分布式对象集合
    • 每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上
    • RDD 可以包含Python、Java、Scala 中任意类型的对象
  2. spark 中,RDD 相关的函数分为三类:

    • 创建RDD
    • 转换(transformation) 已有的RDD
    • 执行动作 (action) 来对RDD 求值

    在这些背后,spark 自动将RDD 中的数据分发到集群上,并将action 并行化执行。

  3. RDD 支持两类操作:

    • 转换操作(transformation) : 它会从一个RDD 生成一个新的RDD
    • 行动操作(action) :它会对RDD 计算出一个结果,并将结果返回到driver 程序中(或者把结果存储到外部存储系统,如HDFS 中)
  4. 如果你不知道一个函数时转换操作还是行动操作,你可以考察它的返回值:

    • 如果返回的是RDD,则是转换操作。如果返回的是其它数据类型,则是行动操作
  5. 转换操作和行动操作的区别在于:行动操作会触发实际的计算。

    • 你可以在任意时候定义新的RDD,但是Spark 只会惰性计算这些RDD :只有第一次在一个行动操作中用到时,才会真正计算

      • 所有返回RDD的操作都是惰性的(包括读取数据的sc.textFile() 函数)
    • 在计算RDD 时,它所有依赖的中间RDD 也会被求值

      • 通过完整的转换链,spark 只计算求值过程中需要的那些数据。
  6. 默认情况下,sparkRDD 会在你每次对它进行行动操作时重新计算。

    • 如果希望在多个行动操作中重用同一个RDD,则可以使用RDD.persist()spark 把这个RDD 缓存起来

    • 在第一次对持久化的RDD 计算之后,Spark 会把RDD 的内容保存到内存中(以分区的方式存储到集群的各个机器上)。然后在此后的行动操作中,可以重用这些数据

    • 之所以默认不缓存RDD的计算结果,是因为:spark 可以直接遍历一遍数据然后计算出结果,没必要浪费存储空间。

  7. 每个Spark 程序或者shell 会话都按照如下流程工作:

    • 从外部数据创建输入的RDD
    • 使用诸如filter() 这样的转换操作对RDD 进行转换,以定义新的RDD
    • 对需要被重用的中间结果RDD 执行persist() 操作
    • 使用行动操作(如count() 等) 触发一次并行计算,spark 会对计算进行优化之后再执行
  8. spark 中的大部分转化操作和一部分行动操作需要用户传入一个可调用对象。在python 中,有三种方式:lambda 表达式、全局定义的函数、局部定义的函数

    • 注意:python 可能会把函数所在的对象也序列化之后向外传递。

      当传递的函数是某个对象的成员,或者包含了某个对象中一个字段的引用(如self.xxx 时),spark 会把整个对象发送到工作节点上。

      • 如果python 不知道如何序列化该对象,则程序运行失败
      • 如果该序列化对象太大,则传输的数据较多
    • 解决方案是:将你需要的字段从对象中取出,放到一个局部变量中:

  9. python中,如果操作对应的RDD 数据类型不正确,则导致运行报错。

二、创建 RDD

2.1 通用RDD

  1. 用户可以通过两种方式创建RDD

    • 读取一个外部数据集。

    • driver 程序中分发driver 程序中的对象集合(如list 或者set

      • 这种方式通常仅仅用于开发和测试。在生产环境中,它很少用。因为这种方式需要将整个数据集先放到driver 程序所在的机器的内存中。

2.2 Pair RDD

  1. 键值对RDD 的元素通常是一个二元元组(而不是单个值)

    • 键值对RDD 也被称作Pair RDD
    • 键值对RDD常常用于聚合计算
    • spark 为键值对RDD 提供了并行操作各个键、跨节点重新进行数据分组的接口
  2. Pair RDD 的创建:

    • 通过对常规RDD 执行转化来创建Pair RDD

      • 我们从常规RDD 中抽取某些字段,将该字段作为Pair RDD的键
    • 对于很多存储键值对的数据格式,当读取该数据时,直接返回由其键值对数据组成的Pair RDD

    • 当数据集已经在内存时,如果数据集由二元元组组成,那么直接调用sc.parallelize() 方法就可以创建Pair RDD

三、转换操作

  1. 转换操作(transformation) 会从一个RDD 生成一个新的RDD

    • 在这个过程中并不会求值。求值发生在action 操作中
    • 在这个过程中并不会改变输入的RDDRDD 是不可变的),而是创建并返回一个新的RDD
  2. spark 会使用谱系图来记录各个RDD 之间的依赖关系

    • 在对RDD 行动操作中,需要这个依赖关系来按需计算每个中间RDD
    • 当持久化的RDD 丢失部分数据时,也需要这个依赖关系来恢复丢失的数据

3.1 通用转换操作

  1. 基本转换操作:

    • .map(f, preservesPartitioning=False) :将函数 f 作用于当前RDD的每个元素,返回值构成新的RDD

      • preservesPartitioning:如果为True,则新的RDD 保留旧RDD 的分区
    • .flatMap(f, preservesPartitioning=False) :将函数 f 作用于当前RDD的每个元素,将返回的迭代器的内容构成了新的RDD

      • flatMap 可以视作:将返回的迭代器扁平化
    • .mapPartitions(f, preservesPartitioning=False):将函数 f 作用于当前RDD的每个分区,将返回的迭代器的内容构成了新的RDD

      这里f 函数的参数是一个集合(表示一个分区的数据)

    • .mapPartitionsWithIndex(f, preservesPartitioning=False):将函数 f 作用于当前RDD的每个分区以及分区id,将返回的迭代器的内容构成了新的RDD

      • 这里f 函数的参数是f分区id 以及一个集合(表示一个分区的数据)

      示例:

    • .filter(f):将函数f(称作过滤器) 作用于当前RDD的每个元素,通过f 的那些元素构成新的RDD

    • .distinct(numPartitions=None):返回一个由当前RDD 元素去重之后的结果组成新的RDD

      • numPartitions:指定了新的RDD 的分区数
    • sample(withReplacement, fraction, seed=None) :对当前RDD 进行采样,采样结果组成新的RDD

      • withReplacement:如果为True,则可以重复采样;否则是无放回采样

      • fractions:新的RDD 的期望大小(占旧RDD的比例)。spark 并不保证结果刚好满足这个比例(只是一个期望值)

        • 如果withReplacement=True:则表示每个元素期望被选择的次数
        • 如果withReplacement=False:则表示每个元素期望被选择的概率
      • seed:随机数生成器的种子

    • .sortBy(keyfunc, ascending=True, numPartitions=None):对当前RDD 进行排序,排序结果组成新的RDD

      • keyfunc:自定义的比较函数
      • ascending:如果为True,则升序排列
    • .glom():返回一个RDD,它将旧RDD 每个分区的元素聚合成一个列表,作为新RDD 的元素

    • .groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):返回一个分组的RDD

      示例:

  2. 针对两个RDD的转换操作:

    尽管RDD 不是集合,但是它也支持数学上的集合操作。注意:这些操作都要求被操作的RDD 是相同数据类型的。

    • .union(other):合并两个RDD 中所有元素,生成一个新的RDD

      • other:另一个RDD

      该操作并不会检查两个输入RDD 的重复元素,只是简单的将二者合并(并不会去重)。

    • .intersection(other):取两个RDD 元素的交集,生成一个新的RDD

      该操作会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。

    • .subtract(other, numPartitions=None):存在于第一个RDD 而不存在于第二个RDD 中的所有元素组成的新的RDD

      该操作也会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。

    • .cartesian(other):两个RDD 的笛卡尔积,生成一个新的RDD

      RDD 中的元素是元组 (a,b),其中 a 来自于第一个RDDb 来自于第二个RDD

      • 注意:求大规模的RDD 的笛卡尔积开销巨大
      • 该操作不会保证结果是去重的,它并不需要网络混洗数据。
  3. .keyBy(f):创建一个RDD,它的元素是元组(f(x),x)

    示例:

  4. .pipe(command, env=None, checkCode=False):返回一个RDD,它由外部进程的输出结果组成。

    • 参数:

      • command:外部进程命令
      • env:环境变量
      • checkCode:如果为True,则校验进程的返回值
  5. .randomSplit(weights, seed=None):返回一组新的RDD,它是旧RDD 的随机拆分

    • 参数:

      • weights:一个double的列表。它给出了每个结果DataFrame 的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0
      • seed:随机数种子
  6. .zip(other):返回一个Pair RDD,其中键来自于self,值来自于other

    • 它假设两个RDD 拥有同样数量的分区,且每个分区拥有同样数量的元素
  7. .zipWithIndex():返回一个Pair RDD,其中键来自于self,值就是键的索引。

  8. .zipWithUniqueId():返回一个Pair RDD,其中键来自于self,值是一个独一无二的id

    它不会触发一个spark job,这是它与zipWithIndex 的重要区别。

3.2 Pair RDD转换操作

  1. Pair RDD 可以使用所有标准RDD 上的可用的转换操作

    • 由于Pair RDD 的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
  2. 基本转换操作:

    • .keys():返回一个新的RDD,包含了旧RDD 每个元素的键

    • .values():返回一个新的RDD,包含了旧RDD 每个元素的值

    • .mapValues(f):返回一个新的RDD,元素为 [K,f(V)](保留原来的键不变,通过f 改变值)。

    • .flatMapValues(f):返回一个新的RDD,元素为 [K,f(V)](保留原来的键不变,通过f 改变值)。它与.mapValues(f) 区别见下面的示例:

    • .sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7f51f1ab5050>):对当前Pair RDD 进行排序,排序结果组成新的RDD

      • keyfunc:自定义的比较函数
      • ascending:如果为True,则升序排列
    • .sampleByKey(withReplacement, fractions, seed=None):基于键的采样(即:分层采样)

      • 参数:

        • withReplacement:如果为True,则是有放回的采样;否则是无放回的采样
        • fractions:一个字典,指定了键上的每个取值的采样比例(不同取值之间的采样比例无关,不需要加起来为1)
        • seed:随机数种子
    • .subtractByKey(other, numPartitions=None):基于键的差集。返回一个新的RDD,其中每个(key,value) 都位于self 中,且不在other

  3. 基于键的聚合操作:

    在常规RDD上,fold()、aggregate()、reduce() 等都是行动操作。在Pair RDD 上,有类似的一组操作,用于针对相同键的元素进行聚合。这些操作返回RDD,因此是转化操作而不是行动操作。

    返回的新RDD 的键为原来的键,值为针对键的元素聚合的结果。

    • .reduceByKey(f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):合并具有相同键的元素。f 作用于同一个键的那些元素的值。

      • 它为每个键进行并行的规约操作,每个规约操作将键相同的值合并起来
      • 因为数据集中可能有大量的键,因此该操作返回的是一个新的RDD:由键,以及对应的规约结果组成
    • .foldByKey(zeroValue,f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):通过f聚合具有相同键的元素。其中zeroValue 为零值。参见.fold()

    • .aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):通过f聚合具有相同键的元素。其中zeroValue 为零值。参见.aggregate()

    • .combineByKey(createCombiner,mergeValue,mergeCombiners, numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):它是最为常用的基于键的聚合函数,大多数基于键的聚合函数都是用它实现的。

      aggregate() 一样,combineByKey() 可以让用户返回与输入数据类型不同的返回值。

      你需要提供三个函数:

      • createCombiner(v)v 表示键对应的值。返回一个C 类型的值(表示累加器)
      • mergeValue(c,v)c 表示当前累加器,v 表示键对应的值。返回一个C 类型的值(表示更新后的累加器)
      • mergeCombiners(c1,c2)c1 表示某个分区某个键的累加器,c2 表示同一个键另一个分区的累加器。返回一个C 类型的值(表示合并后的累加器)

      其工作流程是:遍历分区中的所有元素。考察该元素的键:

      • 如果键从未在该分区中出现过,表明这是分区中的一个新的键。则使用createCombiner() 函数来创建该键对应的累加器的初始值。

        注意:这一过程发生在每个分区中,第一次出现各个键的时候发生。而不仅仅是整个RDD 中第一次出现一个键时发生。

      • 如果键已经在该分区中出现过,则使用mergeValue() 函数将该键的累加器对应的当前值与这个新的值合并

      • 由于每个分区是独立处理的,因此同一个键可以有多个累加器。如果有两个或者更多的分区都有同一个键的累加器,则使用mergeCombiners() 函数将各个分区的结果合并。

  4. 数据分组:

    • .groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):根据键来进行分组。

      • 返回一个新的RDD,类型为[K,Iterable[V]],其中K 为原来RDD 的键的类型,V 为原来RDD 的值的类型。
      • 如果你分组的目的是为了聚合,那么直接使用reduceByKey、aggregateByKey 性能更好。
    • .cogroup(other,numPartitions=None):它基于selfother 两个TDD 中的所有的键来进行分组,它提供了为多个RDD 进行数据分组的方法。

      • 返回一个新的RDD,类型为[K,(Iterable[V],Iterable[W])] 。其中K 为两个输入RDD 的键的类型,V 为原来self的值的类型,Wother 的值的类型。
      • 如果某个键只存在于一个输入RDD 中,另一个输入RDD 中不存在,则对应的迭代器为空。
      • 它是groupWith 的别名,但是groupWith 支持更多的TDD 来分组。
  5. 数据连接:

    数据连接操作的输出RDD 会包含来自两个输入RDD 的每一组相对应的记录。输出RDD 的类型为[K,(V,W)] ,其中K 为两个输入RDD 的键的类型,V 为原来self的值的类型,Wother 的值的类型。

    • .join(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的内连接。
    • .leftOuterJoin(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的左外连接。
    • .rightOuterJoin(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的右外连接。
    • .fullOuterJoin(other, numPartitions=None):执行right outer join

四、行动操作

  1. 行动操作(action) 会对RDD 计算出一个结果,并将结果返回到driver 程序中(或者把结果存储到外部存储系统,如HDFS 中)

    • 行动操作会强制执行依赖的中间RDD 的求值
  2. 每当调用一个新的行动操作时,整个RDD 都会从头开始计算

    • 要避免这种低效的行为,用户可以将中间RDD 持久化
  3. 在调用sc.textFile() 时,数据并没有读取进来,而是在必要的时候读取。

    • 如果未对读取的结果RDD 缓存,则该读取操作可能被多次执行
  4. spark 采取惰性求值的原因:通过惰性求值,可以把一些操作合并起来从而简化计算过程。

4.1 通用行动操作

  1. .reduce(f):通过 f 来聚合当前RDD

    • f 操作两个相同元素类型的RDD 数据,并且返回一个同样类型的新元素。
    • 该行动操作的结果得到一个值(类型与RDD中的元素类型相同)
  2. .fold(zeroValue,op):通过 op 聚合当前RDD

    该操作首先对每个分区中的元素进行聚合(聚合的第一个数由zeroValue 提供)。然后将分区聚合结果与zeroValue 再进行聚合。

    • f 操作两个相同元素类型的RDD 数据,并且返回一个同样类型的新元素。
    • 该行动操作的结果得到一个值(类型与RDD中的元素类型相同)

    zeroValue 参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。

  3. .aggregate(zeroValue,seqOp,combOp):该操作也是聚合当前RDD。聚合的步骤为:

    首先以分区为单位,对当前RDD 执行seqOp 来进行聚合。聚合的结果不一定与当前TDD 元素相同类型。

    然后以zeroValue 为初始值,将分区聚合结果按照combOp 来聚合(聚合的第一个数由zeroValue 提供),得到最终的聚合结果。

    • zeroValuecombOp 聚合函数的初始值。类型与最终结果类型相同
    • seqOp:分区内的聚合函数,返回类型与zeroValue 相同
    • combOp:分区之间的聚合函数。

    zeroValue 参与了分区元素聚合过程,也参与了分区聚合结果的再聚合过程。

    示例:取均值:

  4. 获取RDD 中的全部或者部分元素:

    • .collect():它将整个RDD的内容以列表的形式返回到driver 程序中。

      • 通常在测试中使用,且当RDD 内容不大时使用,要求所有结果都能存入单台机器的内存中
      • 它返回元素的顺序可能与你预期的不一致
    • .take(n):以列表的形式返回RDD 中的n 个元素到driver 程序中。

      • 它会尽可能的使用尽量少的分区
      • 它返回元素的顺序可能与你预期的不一致
    • .takeOrderd(n,key=None):以列表的形式按照指定的顺序返回RDD 中的n 个元素到driver 程序中。

      • 默认情况下,使用数据的降序。你也可以提供key 参数来指定比较函数
    • .takeSample(withReplacement,num,seed=None):以列表的形式返回对RDD 随机采样的结果。

      • withReplacement:如果为True,则可以重复采样;否则是无放回采样
      • num:预期采样结果的数量。如果是重复采样,则最终采样结果就是num。如果是无放回采样,则最终采样结果不能超过RDD 的大小。
      • seed:随机数生成器的种子
    • top(n,key=None):获取RDD 的前n个元素。

      • 默认情况下,它使用数据降序的 top n。你也可以提供key 参数来指定比较函数。
    • .first():获取RDD 中的第一个元素。

  5. 计数:

    • .count():返回RDD 的元素总数量(不考虑去重)

    • .countByValue():以字典的形式返回RDD 中,各元素出现的次数。

    • .histogram(buckets):计算分桶

      • 参数:

        • buckets:指定如何分桶。

          • 如果是一个序列,则它指定了桶的区间。如[1,10,20,50] 代表了区间[1,10) [10,20) [20,50](最后一个桶是闭区间)。该序列必须是排序好的,且不能包含重复数字,且至少包含两个数字。
          • 如果是一个数字,则指定了桶的数量。它会自动将数据划分到min~max 之间的、均匀分布的桶中。它必须大于等于1.
      • 返回值:一个元组 (桶区间序列,桶内元素个数序列)

      • 示例:

  6. .foreach(f):对当前RDD 的每个元素执行函数 f

    • 它与.map(f)不同。.map 是转换操作,而.foreach 是行动操作。
    • 通常.foreach 用于将RDD的数据以json 格式发送到网络服务器上,或者写入到数据库中。
  7. .foreachPartition(f):对当前RDD 的每个分区应用f

    示例:

  8. 统计方法:

    • .max(key=None):返回RDD 的最大值。

      • 参数:

        • key:对RDD 中的值进行映射,比较的是key(x) 之后的结果(但是返回的是x 而不是映射之后的结果)
    • .mean():返回RDD 的均值

    • .min(key=None):返回RDD 的最小值。

      • 参数:

        • key:对RDD 中的值进行映射,比较的是key(x) 之后的结果(但是返回的是x 而不是映射之后的结果)
    • .sampleStdev():计算样本标准差

    • .sampleVariance():计算样本方差

    • .stdev():计算标准差。它与样本标准差的区别在于:分母不同

    • .variance():计算方差。它与样本方差的区别在于:分母不同

    • .sum():计算总和

4.2 Pair RDD 行动操作

  1. Pair RDD 可以使用所有标准RDD 上的可用的行动操作

    • 由于Pair RDD 的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
  2. .countByKey():以字典的形式返回每个键的元素数量。

  3. .collectAsMap():以字典的形式返回所有的键值对。

  4. .lookup(key):以列表的形式返回指定键的所有的值。

五、其他方法和属性

  1. 属性:

    • .context:返回创建该RDDSparkContext
  2. .id():返回RDDID

  3. .isEmpty():当且仅当RDD 为空时,它返回True;否则返回False

  4. .name():返回RDD 的名字

  5. .setName(name):设置RDD 的名字

  6. .stats():返回一个StatCounter 对象,用于计算RDD 的统计值

  7. .toDebugString():返回一个RDD的描述字符串,用于调试

  8. .toLocalIterator():返回一个迭代器,对它迭代的结果就是对RDD的遍历。

六、持久化

  1. 如果简单的对RDD 调用行动操作,则Spark 每次都会重新计算RDD 以及它所有的依赖RDD

    • 在迭代算法中,消耗会格外大。因为迭代算法通常会使用同一组数据。
  2. 当我们让spark 持久化存储一个RDD 时,计算出RDD 的节点会分别保存它们所求出的分区数据。

    • 如果一个拥有持久化数据的节点发生故障,则spark 会在需要用到该缓存数据时,重新计算丢失的分区数据。
    • 我们也可以将数据备份到多个节点上,从而增加对数据丢失的鲁棒性。
  3. 我们可以为RDD 选择不同的持久化级别:在pyspark.StorageLevel 中:

    • MEMORY_ONLY:数据缓存在内存中。

      • 内存占用:高;CPU 时间:低;是否在内存:是;是否在磁盘中:否。
    • MEMORY_ONLY_SER:数据经过序列化之后缓存在内存中。

      • 内存占用:低;CPU 时间:高;是否在内存:是;是否在磁盘中:否。
    • MEMORY_AND_DISK:数据缓存在内存和硬盘中。

      • 内存占用:高;CPU 时间:中等;是否在内存:部分;是否在磁盘中:部分。
      • 如果数据在内存中放不下,则溢写到磁盘上。如果数据在内存中放得下,则缓存到内存中
    • MEMORY_AND_DISK_SER:数据经过序列化之后缓存在内存和硬盘中。

      • 内存占用:低;CPU 时间:高;是否在内存:部分;是否在磁盘中:部分。
      • 如果数据在内存中放不下,则溢写到磁盘上。如果数据在内存中放得下,则缓存到内存中
    • DISK_ONLY:数据缓存在磁盘中 。

      • 内存占用:低;CPU 时间:高;是否在内存:否;是否在磁盘中:是。

    如果在存储级别末尾加上数字 N,则表示将持久化数据存储为 N份。如:

    python 中,总是使用pickle library 来序列化对象,因此在python 中可用的存储级别有:

    MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2

  4. .persist(storageLevel=StorageLevel(False, True, False, False, 1)) :对当前RDD 进行持久化

    • 该方法调用时,并不会立即执行持久化,它并不会触发求值,而仅仅是对当前RDD 做个持久化标记。一旦该RDD 第一次求值时,才会发生持久化。
    • .persist() 的默认行为是:将数据以序列化的形式缓存在JVM 的堆空间中
  5. .cache():它也是一种持久化调用。

    • 它等价于.persist(MEMORY_ONLY)
    • 它不可设定缓存级别
  6. .unpersist():标记当前RDD 是未缓存的,并且将所有该RDD 已经缓存的数据从内存、硬盘中清除。

  7. 当要缓存的数据太多,内存放不下时,spark 会利用最近最少使用(LRU) 的缓存策略,把最老的分区从内存中移除。

    • 对于MEMORY_ONLY、MEMORY_ONLY_SER 级别:下一次要用到已经被移除的分区时,这些分区就要重新计算
    • 对于MEMORY_AND_DISK、MEMORY_AND_DISK_SER 级别:被移除的分区都会被写入磁盘。
  8. .getStorageLevel():返回当前的缓存级别

七、分区

7.1 基本概念

  1. 如果使用可控的分区方式,将经常被一起访问的数据放在同一个节点上,那么可以大大减少应用的通信开销。

    • 通过正确的分区,可以带来明显的性能提升
    • 为分布式数据集选择正确的分区,类似于为传统的数据集选择合适的数据结构
  2. 分区并不是对所有应用都是有好处的:如果给定的RDD 只需要被扫描一次,则我们完全没有必要对其预先进行分区处理。

    • 只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
  3. Spark 中所有的键值对RDD 都可以进行分区。系统会根据一个针对键的函数对元素进行分组。

    • spark 可以确保同一个组的键出现在同一个节点上
  4. 许多spark 操作会自动为结果RDD 设定分区

    • sortByKey() 会自动生成范围分区的RDD
    • groupByKey() 会自动生成哈希分区的RDD

    其它还有join()、leftOuterJoin()、rightOuterJoin()、cogroup()、groupWith()、groupByKey()、reduceByKey()、combineByKey()、partitionBy(),以及mapValues()(如果输入RDD有分区方式)、flatMapValues()(如果输入RDD有分区方式)

    对于map()操作,由于理论上它可能改变元素的键,因此其结果不会有固定的分区方式。

    对于二元操作,输出数据的分区方式取决于输入RDD的分区方式

    • 默认情况下,结果采用哈希分区
    • 若其中一个输入RDD 已经设置过分区方式,则结果就使用该分区方式
    • 如果两个输入RDD 都设置过分区方式,则使用第一个输入的分区方式
  5. 许多spark 操作会利用已有的分区信息,如join()、leftOuterJoin()、rightOuterJoin()、cogroup()、groupWith()、groupByKey()、reduceByKey()、combineByKey()、lookup() 。 这些操作都能从分区中获得收益。

    • 任何需要将数据根据键跨节点进行混洗的操作,都能够从分区中获得好处

7.2 查看分区

  1. .getNumPartitions 属性可以查看RDD 的分区数

7.3 指定分区

  1. 在执行聚合或者分组操作时,可以要求Spark 使用指定的分区数(即numPartitions 参数)

    • 如果未指定该参数,则spark 根据集群的大小会自动推断出一个有意义的默认值
  2. 如果我们希望在除了聚合/分组操作之外,也能改变RDD 的分区。那么Spark 提供了.repartition() 方法

    • 它会把数据通过忘了进行混洗,并创建出新的分区集合
    • 该方法是代价比较大的操作,你可以通过.coalesce() 方法将RDD的分区数减少。它是一个代价相对较小的操作。
  3. .repartition(numPartitions):返回一个拥有指定分区数量的新的RDD

    • 新的分区数量可能比旧分区数增大,也可能减小。
  4. .coalesce(numPartitions,shuffle=False):返回一个拥有指定分区数量的新的RDD

    • 新的分区数量必须比旧分区数减小
  5. .partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):返回一个使用指定分区器和分区数量的新的RDD

    • 新的分区数量可能比旧分区数增大,也可能减小。
    • 这里partitionFunc 是分区函数。注意:如果你想让多个RDD 使用同一个分区方式,则应该使用同一个分区函数对象(如全局函数),而不要给每个RDD 创建一个新的函数对象。
  6. 对于重新调整分区的操作结果,建议对其持久化。

    • 如果未持久化,那么每次用到这个RDD时,都会重复地对数据进行分区操作,性能太差

八、混洗

  1. spark 中的某些操作会触发shuffle

  2. shufflespark 重新分配数据的一种机制,它使得这些数据可以跨不同区域进行分组

    • 这通常涉及到在executor 和驱动器程序之间拷贝数据,使得shuffle 成为一个复杂的、代价高昂的操作
  3. spark 里,特定的操作需要数据不会跨分区分布。如果跨分区分别,则需要混洗。

    reduceByKey 操作的过程为例。一个面临的挑战是:一个key 的所有值不一定在同一个分区里,甚至不一定在同一台机器里。但是它们必须共同被计算。

    为了所有数据都在单个reduceByKeyreduce 任务上运行,我们需要执行一个all-to-all 操作:它必须从所有分区读取所有的keykey 对应的所有的值,并且跨分区聚集取计算每个key 的结果。 这个过程叫做shuffle

  4. 触发混洗的操作包括:

    • 重分区操作,如repartition、coalesce 等等
    • ByKey 操作(除了countint 之外),如groupByKey、reduceByKey 等等
    • join操作,如cogroup、join 等等
  5. 混洗是一个代价比较高的操作,它涉及到磁盘IO,数据序列化,网络IO

    • 为了准备混洗操作的数据,spark 启动了一系列的任务:map 任务组织数据,reduce 完成数据的聚合。

      这些术语来自于MapReduce,与sparkmap,reduce 操作没有关系

      • map 任务的所有结果数据会保存在内存,直到内存不能完全存储为止。然后这些数据将基于目标分区进行排序,并写入到一个单独的文件中
      • reduce 任务将读取相关的已排序的数据块
    • 某些混洗操作会大量消耗堆内存空间,因为混洗操作在数据转换前后,需要使用内存中的数据结构对数据进行组织

    • 混洗操作还会在磁盘上生成大量的中间文件。

      • 这么做的好处是:如果spark 需要重新计算RDD的血统关系时,混洗操作产生的这些中间文件不需要重新创建