For example, there are a total of 1,000 tasks, 997 tasks are executed within 1 minute, but the remaining two or three tasks take one or two hours. This situation is very common.
Most tasks are executed very fast, but some tasks are extremely slow.
the progress of the entire Spark job is determined by the task with the longest running time.
when performing shuffle, the
same key on each node must be pulled to
a task on a node for processing, such as
aggregation or join operations according to the key.
For example, most keys correspond to 10 pieces of data, but individual keys correspond to 1 million pieces of data, so most tasks may only be assigned to 10 pieces of data, and then run out in 1 second; but individual tasks may be assigned 1 million pieces The data will run for one or two hours.
|No.||trigger shuffle operations
when data skew, it may be caused by using one of these operators.
|5.||join, cogroup, repartition, etc.|
The first thing to look at is which stage of data skew occurs in.
- yarn-client submit, you can see the log locally, find which stage is currently running in the log;
- yarn-cluster submit, Spark Web UI Run to the first few stages.
Whether using the yarn-client mode or the yarn-cluster mode, we can take a deep look at the amount of data allocated by each task of this stage on the Spark Web UI, so as to further determine whether the uneven data allocated by the task causes data skew.
After knowing which stage the data skew occurs, then we need to calculate which part of the code corresponds to the stage where the skew occurs based on the principle of stage division.
solution: as long as you see a shuffle operator or Spark SQL SQL in the Spark code If there is a statement that will cause shuffle in the statement (such as a group by statement), then it can be determined that the front and the back stage are divided by that place.
val conf = new SparkConf()
the entire code, only one reduceByKey operator will shuffle, the front and back stages will be divided.
* stage0, mainly to perform operations from textFile to map, and perform shuffle write operations.
The shuffle write operation can be simply understood as partitioning the data in the pairs RDD. In the data processed by each task, the same key will be written to the same disk file.
* Stage1 is mainly to perform operations from reduceByKey to collect.
When each task of stage1 starts to run, it will first perform shuffle read operation. The task that performs the shuffle read operation will pull those keys that
belong to the node where each task of stage 0 is located, and then perform operations such as global aggregation or join on the same key. Here, the value of the key is accumulated.
stage1 executes the reduceByKey operator, it calculates the final wordCounts RDD, and then executes the collect operator to pull all the data to the Driver for us to traverse and print out.
|No.||View the data distribution of keys that cause data skew|
|1.||If the data skew caused by the group by and join statements in Spark SQL, then query the key distribution of the table used in SQL Happening.|
|2.||If the data skew is caused by the shuffle operator on Spark RDD, you can view the key distribution in the Spark job, such as
For example, Word Count
we can first sample 10% of the sample data for pairs, then use the countByKey operator to count the number of occurrences of each key, and finally traverse and print the number of occurrences of each key in the sample data on the client.
val sampledPairs = pairs.sample(false, 0.1)
|No.||solutions of the data skew|
|1.||Improve the parallelism of shuffle operations
在对RDD执行shuffle算子时，给shuffle算子传入一个参数，比如reduceByKey(1000)，该参数就设置了这个shuffle算子执行时shuffle read task的数量
Spark SQL中的shuffle类语句，比如group by、join等，需要设置一个参数，即spark.sql.shuffle.partitions，该参数代表了shuffle read task的并行度，该值默认是200，对于很多场景来说都有点过小。
Experience： cannot completely solve the data skew，such as the amount of data a key is 1 million.
|2.||Two-stage aggregation (local aggregation + global aggregation)
disadvantages: only solve aggregate shuffle operations. If it is a shuffle operation of the
|3.||Convert reduce join to map join
advantages: The effect is very good for data skew caused by the join operation, because shuffle and data skew will not happen at all.
disadvantages: only suitable for
The driver and each Executor will have a full amount of data of a small RDD in the memory.
If the RDD data we broadcast is relatively large, such as 10G or more, then memory overflow may occur. Therefore, it is not suitable for the situation where both are large tables.
WordCounts reduceByKey More Info...
# read data from text file and split each line into words
# count the occurrence of each word
[('0_Singapore', 3), ('2_bbb', 1), ('0_hello', 1), ('2_haha', 1), ('0_world', 1), ('1_ShangHai', 1), ('0_China', 1), ('2_Singapore', 4), ('1_Singapore', 1), ('2_hello', 1)]
words_recover = wordCounts.map(lambda x: (x[x.find('_')+1:], x))
[('world', 1), ('ShangHai', 1), ('China', 1), ('Singapore', 8), ('bbb', 1)]
Other Solution 1: Use Hive ETL to preprocess data
Other Solution 2: Filter a few keys that cause skew
- SparkSQL中的三种Join及其具体实现（broadcast join、shuffle hash join和sort merge join
- Spark Join——Broadcast Join、Shuffle Hash Join、Sort Merge Join
- spark sql优化：小表大表关联优化 & union替换or & broadcast join
- Spark实践 – 性能优化基础
- Spark Sql 与 MySql 使用 group by 的差别