使用 Hive 的时候最头疼的就是数据倾斜的问题,其实这个问题不光会体现在 Hive 上,目前大数据计算引擎都会面临这种问题。有问题就会有解决的方法,现在来一点一点分析数据为什么会倾斜,倾斜来到底怎样去优化。
数据倾斜数据量的倾斜,计算量的倾斜。数据倾斜也分为数据量的倾斜,计算量的倾斜。数据量的倾斜是指某一个节点上分配了比其它节点多数倍甚至数十倍的数据,计算量的倾斜是指每个节点上的数据量基本一致,但是某几个 key 计算的时间特别长,导致某几个 reduce 长时间计算。
map端优化
- 合并小文件
对输入小文件进行合并
1
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
map join
增加 map 数量的优化
比如说字段很少,一个 map 中包含大量的数据导致计算缓慢。
减少 map 数量的优化
比如每个 map 中块数据比较少,起了好多个 map 任务,每个 map 任务处理数据量都很小,需要减少 map 数量。
shuffle优化
shuffle 传输数据压缩优化
join 优化
group by 优化
reduce优化
合并小文件
压缩文件
对 Hive 运行参数的优化
hive.map.aggr
set hive.map.aggr=true;
开启在有 group by 查询的时候 map 端部分聚合,相当于 Combiner ,默认值 true
;
hive.optimize.skewjoin
set hive.optimize.skewjoin=true;
开启数据在 join 时倾斜的优化,该参数为在运行时动态指定数据进行 skew join,一般和 hive.skewjoin.key
参数一起使用, 默认值 false
。
hive.skewjoin.key
set hive.skewjoin.key=100000;
设置数据倾斜的阈值,如果在 join 中发现同样的 key 超过该值则认为是该 key 是倾斜的 join key,默认值 100000
。
hive-數據傾斜解決詳解 这篇文章对这个参数原理讲的比较详细。
对于
hive.skewjoin.key
,在执行 job 时,将它们存入临时的 HDFS 目录,其它数据正常执行;对倾斜数据开启 map join 操作,对非倾斜值采取普通 join 操作;
将倾斜数据集和非倾斜数据及进行合并操作。
hive.optimize.skewjoin.compiletime
set hive.optimize.skewjoin.compiletime=true;
在编译期开启数据在 join 时倾斜的优化只有在建表语句中指定了 skew key 这个参数才可以生效,默认值 false
。
可以通过如下建表语句指定 skewed key:
1
2
CREATE TABLE list_bucket_single (key STRING, value STRING)
SKEWED BY (key) ON (1, 5, 6) [STORED AS DIRECTORIES];
hive.groupby.skewindata
set hive.groupby.skewindata=true;
开启数据在 group by 时倾斜的优化, 默认值 false
。
hive数据倾斜优化策略 对这个参数的作用做了详细的介绍。
https://blog.csdn.net/hellojoy/article/details/82931827
hive.exec.reducers.bytes.per.reducer
set hive.exec.reducers.bytes.per.reducer=64000000;
设置每个 reduce 的大小,64000000 表示 64M,默认值 256000000
。
hive.exec.reducers.max
set hive.exec.reducers.max=1009;
设置 reduce 最大数量,如果参数 mapred.reduce.tasks
没有使用,Hive 将在自动分配使用 reduce 数量时候的最大 reduce 数量,默认值 1009
。
mapred.reduce.tasks
set mapred.reduce.task=100;
设置每个作业的 reduce 数量,如果是 -1 的话,系统则会自动分配 reduce 的数量,默认值 -1
。
默认是先设置 hive.exec.reducers.bytes.per.reducer
这个参数,设置了后 Hive 会自动计算 reduce 的个数,因此两个参数一般不同时使用。
hive.exec.parallel
set hive.exec.parallel=true;
开启任务并行执行,可以将没有依赖关系的 stage 并行执行,默认值 false
。
hive.exec.parallel.thread.number
set hive.exec.parallel.thread.number=8;
设置允许并行执行的最大任务数,默认值 8
。
hive.exec.compress.output
set hive.exec.compress.output=true;
开启最终输出数据压缩,压缩的格式和其它的选项在 Hadoop 设置参数 mapred.output.compress*
中,默认值 false
。
hive.exec.max.dynamic.partitions.pernode
set hive.exec.max.dynamic.partitions.pernode=366;
设置在 mapper/reducer 节点上允许最大动态分区的数量,默认值 100
。
hive.input.format
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
设置 Hive 默认输入格式,默认值 org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
。
hive.merge.mapfiles
set hive.merge.mapfiles=true;
开启在 map-only jobs 后合并文件,默认 true
。
hive.merge.mapredfiles
set hive.merge.mapredfiles=true;
开启在 map-reduce jobs 后合并文件,默认 false
。
hive.merge.sparkfiles
set hive.merge.sparkfiles=true;
开启在 Spark DAG 任务后合并文件,默认 false
。
hive.merge.smallfiles.avgsize
set hive.merge.smallfiles.avgsize=16000000;
设置输出文件平均大小阙值,如果输出文件平均大小小于这个值,则对于设置了 hive.merge.mapfiles=true
的 map-only jobs 和对设置了 hive.merge.mapredfiles=true
的 map-reduce job 开启小文件合并,默认值 16000000
。
hive.merge.size.per.task
set hive.merge.size.per.task=256000000;
设置文件合并的大小,默认值 256000000
。
hive.mapjoin.smalltable.filesize
set hive.smalltable.filesize=25000000;
设置小表大小的阙值,如果表大小小于该值,可以将 common join 尝试转换为 map join,默认值 25000000
。
hive.fetch.task.conversion
set hive.fetch.task.conversion=none;
设置禁用 Hive FETCH 模式,默认值 more
。
none
:禁用 FETCH TASK。minimal
:当使用SELECT *
、LIMIT
以及在分区列上执行过滤(WHERE
和HAVING
) 时,可以转换为 FETCH TASK。more
:当使用SELECT
,LIMIT
以及FILTER
时,可以转换为 FETCH TASK。可以在SELECT
子句中使用任何表达式,包括UDF。(UDTF和 Lateral views尚不支持)。
注意:使用默认设置,在查询大表的时候,limit 语句可能会出发 FETCH TASK,导致执行时间过长。这个时候可以禁用 FETCH TASK 使用 map-reduce 来进行查询
mapred.max.split.size
set mapred.max.split.size=256000000;
设置每个 map 最大输入大小,默认值 256000000
。
这个设置已经过时了,新的参数为 mapreduce.input.fileinputformat.split.maxsize
。
mapred.min.split.size.per.node
set mapred.min.split.size.per.node=1;
设置每个节点上可以处理最小文件的大小,默认值 1
。
这个设置已经过时了,新的参数为 mapreduce.input.fileinputformat.split.minsize.per.node
。
mapred.min.split.size.per.rack
set mapred.min.split.size.per.rack=1;
设置每个机架上可以处理最小文件的大小,默认值 1
。
这个设置已经过时了,新的参数为 mapreduce.input.fileinputformat.split.minsize.per.rack
。
对 Hive 语句的优化
对 join 的优化
1、对于join过程来说,如果出项较多的key值为空或异常的记录,或key值分布不均匀,就容易出现数据倾斜, https://www.jianshu.com/p/859e39475832?utm_campaign
对 group by 的优化
2、对于group by 过程来说,如果某一个key值有特别的多的记录,其它key值的记录比较少,也容易出项数据倾斜。
对 order by 的优化
对 count distinct 的优化
对文件格式的优化
只有一个 reduce 的情况
优化案例
http://www.360doc.com/content/16/0517/00/29157075_559747518.shtml
https://www.cnblogs.com/lenmom/p/11463216.html
https://blog.csdn.net/WYpersist/article/details/80030499
https://www.google.com/search?newwindow=1&biw=1536&bih=722&sxsrf=ALeKk03IyXKOXoM-Mm1Cl1lA8lX4yL8XNQ%3A1588779547538&ei=G9qyXt6vIIva0gTYiowo&q=hive+sum+%E6%95%B0%E6%8D%AE%E4%B8%A2%E5%A4%B1&oq=hive+sum+%E6%95%B0%E6%8D%AE%E4%B8%A2%E5%A4%B1&gs_lcp=CgZwc3ktYWIQDFAAWABgpIm1AWgAcAB4AIABAIgBAJIBAJgBAKoBB2d3cy13aXo&sclient=psy-ab&ved=0ahUKEwieqbqEyZ_pAhULrZQKHVgFAwU4FBDh1QMIDA
https://www.cnblogs.com/hustzzl/p/7888001.html