数据开发之离线计算_Hive小文件产生原因与治理方法
1.背景
python计算任务脚本的最后两行命令都是:
1 | ht.exec_sql(schema_name = 'app', sql = sql) |
很明显,第一行命令是先执行编写号的hivesql,计算生产相应的hive表分区和数据;第二行命令是将一个分区目录下的小文件进行合并,减少文件数量。
我找了半天公司cf文档,都没有找到小文件合并工具mergefiles-1.7.jar的源码或者实现原理,那么我只能以此为切入点扩散积累一下hive小文件合并相关的知识。
2.动态分区
2.1 动态分区原理
静态分区和动态分区并不是真正的hive表分区,而是指执行insert类型hivesql时填写分区字段的方式。更准确来说,静态分区的列值是在代码编译时期通过用户传入来决定的,动态分区的列值则是hivesql执行过程中才能确定的。
静态分区使用方法如下,就是直接指定所有分区字段值:
1 | insert overwrite table table1 partition (dt='20210805', ht='00') |
半动态分区使用方法如下,注意静态分区字段一定要在动态分区字段前面:
1 | insert overwrite table table1 partition (dt='20210805', ht) |
动态分区使用方法如下,就是通过select查询出来的字段结果自动判断数据该load到哪一个分区去:
1 | insert overwrite table table1 partition (dt, ht) |
2.2 动态分区相关参数
hive.exec.dynamic.partition表示是否开启动态分区功能,默认值为false;当使用动态分区时该参数必须设置成true。
hive.exec.dynamic.partition.mode表示动态分区的模式,默认值为strict;其中strict模式表示必须指定至少一个分区字段为静态分区,nonstrict模式表示允许所有分区字段使用动态分区。
hive.exec.max.dynamic.partitions.pernode表示在每个执行MR任务的节点上,最大可以创建多少个动态分区,默认值为100;比如源数据中包含了一年的数据,也就是分区字段day有365个值,一次性处理一年数据时就必须将该参数设置为大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions表示在所有执行MR任务的节点上,最大一共可以创建多少个动态分区,默认值为1000。
hive.exec.max.created.files表示在整个MR任务过程中,最多可以创建多少个HDFS文件,默认值为100000;一般使用默认值就够了。
3.distribute by
3.1 order by
当hivesql中指定了order by时,不管有多少个map任务,所有的数据都会放到同一个reduce任务中进行全局排序,当数据量较大时标率非常低。即使设置了mapreduce.job.reduces参数值大于1,当使用order by时,仍然之后使用一个reduce任务处理所有数据。
3.2 sort by
当hivesql中指定了sort by时,会在每个reducer端对数据进行排序,为每个reduce任务产生一个排序文件。也就是说sort by只能保证局部有序,也就是每个reducer出来的文件是有序的。
3.3 distribute by
distribute by用于控制map端的数据以何种方式拆分给reduce端,会将distribute by后面的字段的值进行hash,然后将hash值相同的数据行分发给同一个reduce任务。
如person表存在数据如下:
| name | age | sex |
|---|---|---|
| zhangsan | 16 | male |
| lisi | 19 | female |
| wangle | 24 | male |
| maliu | 26 | female |
则采用如下hivesql语句,则MR任务会生成2个reduce任务,将zhangsan和wangle放到同一个reduce任务中处理,将lisi和maliu放到另一个reduce任务中处理:
1 | insert overwrite table person1 |
通常将distribute by与sort by配合使用,这样相当于先按照distribute by字段将数据分开存储到不同的reduce输出文件中,再将reduce输出文件中的数据按照sort by字段进行排序。注意distribute by与sort by配合使用时,distribute by必须写在sort by前面。
4.Hive小文件产生原因与治理方法
4.1 小文件产生原因
hivesql对应的MR任务执行过程中产生的文件主要有两种,一种是map任务输出文件,一种是reduce任务输出文件,两者之间的数量关系可以大致理解为:如果map任务数为m,reduce任务数为n,那么map任务输出文件数量就会是m*n,reduce任务输出文件数量就会是n个。
map任务输出文件本质是一堆中间文件,MR任务执行完成之后就会被删除,最终长久保存的是reduce输出文件。但是这两种类型的小文件过多都是有负面影响的,在不影响效率的情况下都应该尽量减少。
在不通过参数指定map数量和reduce数量的情况下,MR任务是根据数据量、文件类型、源文件数量等多方面因素来决定map数的,同时根据数据量、动态分区等来决定reduce数的。
那么经过上述分析hivesql产生过多小文件的主要原因有:
1.源数据本身小文件数量过多,对应产生的map数过多,导致中间map输出文件和最终reduce输出文件都过多。
2.MR任务为提升运算效率产生reduce数量过多,导致最终结果文件过多。
3.在使用了动态分区的情况下,每个map任务会为该map中的数据的每个不同动态分区生成一个文件,比如有100个map任务,每个map任务中又包含100个动态分区的数据,那么就会生成100*100个map输出文件。
4.2 小文件过多影响
Hive使用HDFS文件系统存储数据文件,我们所说的小文件问题,就是存储在HDFS上的数据文件是明显小于HDFS文件块大小的。
1.HDFS内存资源消耗过大,限制了数据存储规模
HDFS系统中,具体的文件保存在datanode节点中,每个文件都会有一份与之对应的元信息存储在namenode节点的内存中。因此,当过多的小文件消耗了namenode大量内存时,会使得集群扩展性收到极大限制。
2.数据访问更加耗时
在HDFS系统中,每次读写文件都需要先从namenode获取文件的元数据信息,然后与datanode建立连接。当小文件太多时会导致需要大量的定位寻址操作,这种访问方式严重影响性能。
3.数据运算时间与资源成本更高
在计算层面,小文件越多意味着MR任务执行过程中所需要创建的map任务也越多,每个map需要开一个JVM去执行,所以这些任务的启动和执行都会带来大量的时间和资源消耗,严重影响计算性能。
4.3 小文件治理方法
治理Hive小文件问题主要有两种思路:
1.从源数据端限制小文件的产生个数,就是尽量减少MR任务的map数和reduce数
在hivesql中设置如下参数控制MR任务执行过程中合并小文件:
1
2
3
4
5
6
7
8
9
10
11## 设置map输入合并小文件的相关参数
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
## 设置map输出和reduce输出进行合并的相关参数
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=256000000;如上文所述的distribute by语法可以用于控制在map端拆分数据给reduce,当hivesql中使用了动态分区时,最好使用
distribute by cast(rand()*N as int)将数据随机分配给reduce数,这样可以控制reduce数量也可以使得每个reduce处理的数据量大体一致,比如insert overwrite table table1 partition(date) select * from person distribute by cast(rand()*100 as int);可以控制每个分区目录下生成100个大小基本一致的数据文件。sparksql执行引擎生成的文件数量一般是RDD中partition数与表分区数的乘积,可以通过在最后一个RDD阶段添加repartition操作控制partition数,这一步的作用与distribute by一致,从而控制最终生成的文件数量。
2.对已经产生的小文件进行合并处理
- 最常见的就是创建离线任务将一个目录下的小文件合并为大文件,这是最通用的一种方法,背景中所述的mergefiles-1.7.jar工具应该也是采用这种思路。但是这类方法有一个缺点,合并时需要占用大量的计算资源。
- 针对ORC格式存储的表,可以使用concatenate命令对指定分区进行小文件合并,比如
alter table table1 partition (dt="20200101") concatenate;。