抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >
image

1. 广播变量

image

线程可以共享变量的思路

广播变量:

(1) 在默认情况下,每一个task都会维持一个全局变量的副本

有一个集合:100M 在 driver 中生成, 但是在所有的task中都需要使用

那么,每一个 task 都会维持一个当前这 100M 数据的副本

如果一个 executor 中启动了 6 个 task,最终消耗 600M 内存

(2). 如果使用广播变量的话

那么可以把当前这个100M的数据,就编程一个广播变量的值

用 driver 中的 sparkcontext 进行 全局所有 executor 广播

最后的效果:每个 executor 中只存在一份这个广播变量的副本

而不是原先的每一个task都保持一个副本

所以最终的内存消耗量: 100m

(3) 最后的效果:

  1. 减少了网络数据传输的量
  2. 减少了executor的内存使用

如果一个值很小,那么几乎没有广播的必要。

广播的值的大小越大,效果越明显

2. 累加器

1
val a = sc.accumulator(0)

还原一个累加器

1
val b = a.value

spark 的累加器 和 mapreduce编程模型的全局计数器是一个道理。

3. DAG规划和基础理论

切分 Stage 是 从后往前找 shuffer类型/宽依赖的算子,遇到一个就断开,形成一个 stage

最后一个 stage: ResultStage

除此之外的stage

spark中如何划分stage

因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄赖就将这个RDD加入该stage中。

在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!

而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。

切分stage:

从后往前找 shuffle类型/宽依赖 的算子, 遇到一个就断开, 形成一个 stage

最后一个stage: ResultStage ------> ResultTask
除此之外的stage:ShffleMapStage ------> ShffleTask

每一个 stage 都会切成多个同种类型的 Task

每一个 Stage 中的有可能包含多个不同的 RDD
那么一个 Stage 又有可能会划分多个 task 执行
每个 RDD 又可以指定不同的分区数
默认情况下:每一个分区,就会是一个 Task

那么现在,如果遇到了一个 stage 中有多个不同分区数的RDD,
那么请问:到底这个stage中有多少个Task执行呢?

5 4 3 -----> 3个task

以最后一个RDD的分区数来决定

切分job:

从前往后找action算子, 找到一个就形成一个 job.

image

3 + 2 = 5 tasks

DAG 的生成

checkpoint linage

检查点 血脉 血统

容错

对于Spark任务中的宽窄依赖,我们只喜欢窄依赖

DAGScheduler:

  1. spark-submit 提交任务
  2. 初始化 DAGScheduler 和 TaskScheduler
  3. 接收到 application 之后,DAGScheduler 会首先把 application 抽象成一个 DAG
  4. DAGScheduler 对这个 DAG (DAG中的一个Job) 进行 stage 的切分
  5. 把每一个 stage 提交给 TaskScheduler

rdd1.collect

client 提交任务的任务节点

如果是client模式,那么 driver程序就在 client 节点
如果模式是 cluster, driver 程序在 worker 中.
rdd20.countByKey()

countByKey 是作用在 key-value 类型上的一个 action 算法
countByValue 一般是用来统计普通类型的RDD
map reduce recudeByKey filter, json

难点:

  1. aggregate
  2. aggregate

count sum max min distinct avg

100G ----> 1G
20G -----> 30G

map mapPartitions
join mapjoin reducejoin
cogroup

coalesce
repartition
repartitionAndSortWithinPartitions
重新分区, 并且分区内数据进行排序

6. RDD 持久化操作 cache, persist

cache:

正常情况下: 一个RDD中是不包含真实数据的,只包含描述这个RDD的源数据信息

如果对这个RDD调用 cache 方法

那么这个rdd中的数据,现在依然还是没有真实数据

直到第一次调用一个action的算子出发了这个RDD的数据生成,那么cache 操作

cache()
persist() == persist(StorageLevel.MEMORY_ONLY)
persist(StorageLevel.XXXX)

1
2
3
4
5
def cache(): this.type = persist()

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

persist() ==== persist(StorageLevel.MEMORY_ONLY)

一个普通的文件 file ===》 内存

该 file 被序列化了 ===》 内存

JVM 最大的区域是 Head 内存, OffHeap 堆外内存

7. union, join, subtract, cartesian

8. 各种 byKey 操作 (重要)

----union, join, coGroup, subtract, cartesian----

  1. groupByKey
  2. reduceByKey
  3. aggregateByKey
  4. sortByKey
  5. combineByKey

8.1 groupByKey

1
val data = List(("math", 89), ("hadoop", 100), ("math", 10), ("english", 89), ("math", 1000))

groupByKey把相同的key的数据分组到一个集合序列当中:

1
2
3
4
5
6
7
8
9
10
11
12
13
[
("spark",1),
("hive",1),
("spark",1),
("hadoop",1),
("hive",1)
]
-->
[
("spark",(1,1)),
("hive",(1,1)),
("hadoop",(1))
]

9. RDD mapPartitions, mapPartitionsWithIndex

mapPartitions 每次遍历一个分区 (最小单位是分区)

10. map, flatMap, filter

11. spark 基础概念复习

11.1 第1部分

第一天spark内容的部分残留

 WordCountJava7
 WordCountJava8

任务的提交:

(1). run-example SparkPi 100
(2). spark-shell
(3). spark-submit

–master:

local local[2] local\[\*\]
spark://hadoop02:7077, hadoop04:7077
yarn

HDFS 处理 myha01 这个 nameservice 的方式非常的暴力:

所有的请求,其实都会给每个 namenode 都发送, 但是只有 active 的 namenode才会进行处理,进行回复
if (namenode.getServiceStage() == "standby") {} else : ...

11.1 第2部分

核心功能: SparkContext, 存储体系, 执行引擎 DAGScheduler, 部署魔术

扩展功能: SQL, Streaming, GraphX, MLlib, SparkR, Pyspark

核心概念:

Application
Job 切分标准: 从前往后找action的算子
Stage 切分标准: 从后往前找宽依赖的算子
Task

image

Driver Application: 客户端驱动程序, 也可以理解为客户端应用程序,用于将任何程序转换为 RDD 和 DAG, 并与 Cluster Manager 进行通信与调度.

ClusterManager
Driver
Executor
Master, Worker, Client

deploy-mode 主要针对 yarn: client cluster

基本架构:

编程模型:

1). 获取编程入口 ---> SparkContext

2). 通过编程入口使用不同的方式加载数据得到一个数据抽象 ---> RDD

3). 针对加载得到的数据抽象调用不同的算子进行处理 ---> Transformation + Action

4). 针对结果数据进行处理 RDD/Scala 对象 或 集合 ---> print / save 存储

5). 关闭编程入口 ---> sparkContext.stop()

sparkSQL
sparkStreaming
&
sparkCore 一模一样
唯一不相同的地方就是: 编程入口, 数据抽象, 算子

RDD:

(1) 概念: 弹性分布式数据集, 不可变的, 可分区的分布式集合
(2) 属性:

  • A list of partitions
  • A function for computing on other RDDs
  • A list of dependencies on other RDDs
  • Optionally, a Partition for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

12. RDD 算子 sample, takeSample

Reference

Offer帮 英语学习包