数据开发之离线计算_Hive数据倾斜解决方法
1.Hive数据倾斜原因
hivesql采用mapreduce分布式计算引擎进行海量数据处理,hive的数据倾斜就是就是由于数据分布不均匀,导致一个或几个节点处理的数据量比其他节点大很多。
在map和reduce两个阶段都有可能发生数据倾斜。一个MR计算任务中,数据文件在进入map阶段之前都会进行split,默认按128MB大小切分为数据块,分配给不同map任务读取。但是当输入文件使用了GZIP等不支持文件切分的压缩格式时,MR任务无法对输入文件进行split,该压缩文件只会被一个map任务所读取,如果有一个超大的不可切分的压缩文件被一个map任务读取,就会发生map阶段的数据倾斜。
相比于map阶段,reduce阶段更容易因为大key发生数据倾斜。因为从map到reduce的过程中会发生shuffle,当根据业务需求对key进行分区聚合(join/groupBy/countDistinct)时,在shuffle过程中会默认将key进行hash,然后将相同的key放入同一个reduce任务中进行计算处理。当数据中存在大量相同key,导致不同reduce节点数据量分配极度不均衡时,就会发生reduce阶段的数据倾斜。
2.定位HiveSQL中产生倾斜的代码段
首先通过yarn监控平台或者jobhistory监控平台来查看相关指标判断是不是数据倾斜问题:
1.通过map或reduce的执行时间elapsed time来判断,如果某个map或reduce的所有推测任务都明显比其他同阶段任务执行时间长,那就是数据倾斜问题。

2.进入task的监控界面点击查看每个task的counters信息,通过比较输入数据大小number of bytes read也可以进行判断,如果某个map或reduce任务明显比其他同阶段任务输入数据量大,那就是数据倾斜问题。

通过监控界面确定了发生倾斜的mapreduce任务和对应的jobname之后,可以从中看出hivesql的stageID,然后就可以查看该hivesql的执行计划,确定该stageID对应的是哪一段代码,从而对该段代码进行优化。
3.具体倾斜原因于对应解决办法
3.1 Map输入文件过大或过小
1.当map输入文件为过大的不可split压缩格式文件(GZIP等)时,会造成单个map任务读取数据过多,应首先考虑转换压缩格式(ZIP、BZIP2等)或手动切分输入文件。
2.map任务默认的input split是128MB,会将可分割的大文件按该大小进行分割读取,但是并不会将小文件进行合并再交给同一个map任务处理,而是为每一个小文件生成一个map任务。所以也要避免输入文件是大量的小文件,应先手动合并成大文件。
3.2 group by
group by引起的倾斜主要是输入数据行按照group by列值分布不均匀引起的。要解决该倾斜问题的思路就是分步聚合,将一个MR任务拆成多个MR任务,先将这些大key打散,使其分散到多个reduce解决中进行聚合计算,最后再将聚合过后变少的数据再聚合到一个reduce上进行计算。
实现实例:假设我们现在已知group by key = hello时是大key,那么在group by之前,我们可以给key=hello的数据加上0-9的随机数前缀,变成0-hello、1-hello…,此时做group by数据就会被分散到10个reduce中,然后再在上层查询中将添加的随机数前缀去掉,使其变回hello再做一次全局聚合。
通过实现实例看起来自己通过sql代码实现比较麻烦,实际上hive已经实现了相应的优化方法,用户只需要配置几个参数就行了。对于group by引起的倾斜,只需要设置如下三个参数,hive就会自动实现上述优化逻辑:
1 | set hive.map.aggr = true --开启map端提前聚合,就相当于开启combiner,减少数据传输及在reduce阶段的处理时间,默认是开启的。 |
3.3 count distinct
使用count distinct时会将map端所有的输出数据全部分布到一个reduce任务上去进行去重统计个数,这已经不是数据倾斜的问题了,而是只要数据量大就一定会放在一个reduce上运行导致性能较差。
对于这种问题,可以通过先group by再count的方式来进行优化,本质是通过增加一个group by的MR任务来将去重计算分散到多个reduce节点中进行。
优化实例如下:
1 | select count(distinct id) from table; |
3.4 大表join小表
正常的join操作是将map阶段生产的输出数据按照join key进行shuffle,将相同的join key数据按照hash分发到同一个reduce节点再进行join,当join key字段存在大key时就会发生数据倾斜。
对于大表join小表,hive提供了一种mapjoin的方法来解决数据倾斜的问题,就是将小表直接全量加载到每个map节点上,然后直接在map阶段直接在每个map节点上做好join操作,这样就直接避免了按照join key进行数据分发引起的倾斜。
通过设置如下hive参数即可使用mapjoin:
1 | set hive.auto.convert.join = true --开启map端join,默认就是开启的。 |
3.5 大表join大表
大表join大表产生倾斜的原因于上述大表join小表一致,存在热key或者大量空key时,导致数据分发倾斜。
对于大表join大表倾斜问题,hive提供了如下参数进行优化:
1 | set hive.optimize.skewjoin = true --开启join运行时优化。 |
join操作时遇到的数据倾斜也很有可能是空key造成的,inner join会自动过滤空key,其他join不需要空key数据时最好在join之前就先进行where key is not null过滤。
4.倾斜SQL优化实例
上一节中主要描述的都是通过hive参数的调整,使用hive内置的倾斜治理方案,那么也有一些直接通过sql改写来实现倾斜优化的方法,如下sql:
1 | t1 |
在上述sql中,t1与vender两表做join进行关联,因为t1.vender_id字段可能存在较多为空的情况,所以为了避免数据倾斜,当t1.vender_id为空时,通过concat(‘hive’,rand()),使用hive+随机数作为vender_id,然后再与vender.vender_id进行关联。
但是上述使用rand()的形式,当遇到部分节点失败或者fetch failure导致stage重算时,这种对同一个值进行随机划分的方式在特定组合下会产生重叠,这样就会导致最终结果数据出现数据重复或数据丢失现象的问题。所以更好的替代方案是使用md5加密唯一维度值或者直接使用主键进行空值划分,如下sql中假设t1的逐渐是”k1”:
1 | t1 |
这样即能打散空关联字段,又能避免rand()带来的fetch failure重试不确定性。
5.大表join大表的倾斜问题解决
5.1 局部聚合+全局聚合
该方案适用于对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合,且指标支持分步聚合的场景。通过将原本相同的key附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题,最后再去除掉随机前缀,进行全局聚合,就可以得到最终的结果。但如果是join类的shuffle操作,还得用其他的解决方案。
5.2 采样倾斜key并分拆join操作
如果两个RDD/hive表进行join的时候,数据量都比较大,其中某一个RDD/hive表中的少数几个key的数据量过大,而另一个RDD/hive表中的所有key都分布比较均匀,可以将数据量大的几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join。此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task上去join了,最后将结果使用union算子合并起来即可。但如果导致倾斜的key特别多的话,不适合该方案。
5.3 使用随机前缀和扩容RDD进行join
如果在进行join操作的时候,RDD中有大量的key导致数据倾斜,可以将该RDD的每条数据都打上一个n以内的随机前缀,同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀,最后将两个处理后的RDD进行join即可。该方案与上一种方案的不同之处在于,上种方案是尽量只对少数倾斜key对应的数据进行特殊处理(扩容RDD),对内存的占用并不大;而该方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,只能对整个RDD进行数据扩容,对内存资源要求很高。
实现步骤:
对左表Key添加随机前缀:将左表的关联Key拼接一个随机前缀(如
0~N),使得原本相同的Key被分散到多个不同Key中1
2
3
4SELECT
CONCAT(key, '_', CAST(FLOOR(RAND() * N) AS STRING)) AS salted_key,
value
FROM left_table;(N为随机前缀的范围,例如5~10,根据数据倾斜程度调整)
对右表Key进行膨胀:将右表的关联Key复制多份,每份拼接不同的前缀(0到N),确保能与左表的所有随机前缀匹配
1
2
3
4
5SELECT
CONCAT(key, '_', CAST(tmp.prefix AS STRING)) AS salted_key,
value
FROM right_table
LATERAL VIEW EXPLODE(SPLIT(REPEAT('0,1,2,...,N', 1), ',')) tmp AS prefix;(通过LATERAL VIEW和EXPLODE生成膨胀后的Key)
执行加盐后的Join:对处理后的左表和右表按salted_key进行Join,此时数据会被均匀分布到多个Reduce节点
1
2
3
4
5
6
7SELECT
LEFT.key,
LEFT.value,
RIGHT.value
FROM salted_left_table LEFT
JOIN salted_right_table RIGHT
ON LEFT.salted_key = RIGHT.salted_key;最终去盐聚合:根据原始Key对结果去盐聚合(如SUM、MAX等)
1
2
3
4
5SELECT
SUBSTR(salted_key, 1, LENGTH(salted_key)-2) AS original_key,
SUM(value)
FROM joined_result
GROUP BY SUBSTR(salted_key, 1, LENGTH(salted_key)-2);