HiveSQL实战积累_HiveSQL通过横向拆分10份缓解数据倾斜实战

HiveSQL实战积累_HiveSQL通过横向拆分10份缓解数据倾斜实战

数据量较大的场景下,通过skuId等整数类型字段将全量数据横向拆分为多个任务执行相同的脚本,这样做主要有两个好处:
1.提高并行度,争取更多资源,减小倾斜概率;

2.提高任务失败容错,当任务由于倾斜过于严重或者节点异常原因失败时,只需要重跑一个任务,不必重跑全量数据。

不过注意这种优化操作执行过程中使用的到临时表、临时文件夹一定要做到有始有终,开始时创建,结束时删除,不然由于疏忽导致的日积月累的资源浪费会非常巨大。

拆分计算脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.getenv('HIVE_TASK'))
from HiveTask import HiveTask

ht = HiveTask()
dt = ht.oneday(0)
dt_int = ht.oneday(0,'')
mod_number = sys.argv[2]
tb_name = 'gdm_m07_promt_sku_info'

sql = """
use tmp;
DROP TABLE if exists promt_sku_"""+mod_number+"""_"""+dt_int+""";
create table promt_sku_"""+mod_number+"""_"""+dt_int+""" STORED AS ORC AS
SELECT
main.promt_id --促销编号
,main.promt_type_cd --促销类型
,main.promt_sub_type_cd --促销子类型代码
,main.vender_type promt_vender_type_cd --促销商家类型代码
,main.begin_time expt_start_tm --促销预期开始时间
,main.effect_time actual_start_tm --促销实际开始时间
,main.end_time expt_end_tm --促销预期结束时间
,main.check_state check_flag --促销审核标志
,main.promt_state promt_status_cd --促销状态代码
,main.create_time create_tm --促销删除时间
,main.delete_state delete_flag --促销删除标志
,main.delete_time delete_tm --促销删除时间
,range.skuid item_sku_id --商品SKU编号
,IF(range.grouptype = 4, 1, range.grouptype) sku_sale_type_cd --商品销售类型代码
,IF(range.grouptype = 4, 0, 1) use_flag --可用标志
,range.yn valid_flag --有效标志
,range.groupid pool_id --池编号
,range.num sku_num
,range.batchid --批次号
,range.venderid vender_id
,range.showstate show_state --显示状态
,range.price --促销创建时京东价
,range.splitprice split_price --拆分价
,range.inprice --进货价
,range.cbj --仓报价
,range.cfprice --创建时返现
,range.buyprice --加价购金额
,range.extinfo ext_info
,main.dt
FROM
(
SELECT
promt_id,
promt_type_cd,
promt_sub_type_cd,
begin_time,
end_time,
check_state,
promt_state,
delete_state,
delete_time,
vender_type,
effect_time,
create_time,
dt
FROM
gdm.gdm_jdr_sch_d07_mkt_promtion_di
WHERE
dt >= '"""+dt+"""'
)main
JOIN
(
SELECT
promoid,
skuid,
grouptype,
groupid,
yn,
created,
modified,
extinfo,
num,
price,
splitprice,
inprice,
cbj,
cfprice,
batchid,
venderid,
showstate,
buyprice
FROM
(
SELECT
promoid,
skuid,
grouptype,
groupid,
yn,
created,
modified,
extinfo,
num,
price,
splitprice,
inprice,
cbj,
cfprice,
batchid,
venderid,
showstate,
buyprice,
row_number() over(partition BY promoid, skuid, grouptype order by created DESC, modified DESC) AS rn
FROM
(
SELECT
promoid,
skuid,
grouptype,
groupid,
yn,
created,
modified,
extinfo,
num,
price,
splitprice,
inprice,
cbj,
cfprice,
batchid,
venderid,
showstate,
buyprice
FROM
fdm.fdm_main_promo_1_rangeproduct_chain
WHERE
start_date <= '"""+ht.data_day_str+"""'
AND end_date > '"""+ht.data_day_str+"""'
AND skuid % 10 = """+mod_number+"""

UNION ALL

SELECT
promoid,
skuid,
grouptype,
groupid,
yn,
created,
modified,
extinfo,
num,
price,
splitprice,
inprice,
cbj,
cfprice,
batchid,
venderid,
showstate,
buyprice
FROM
fdm.fdm_main_promo_1_multirangeproduct_chain
WHERE
start_date <= '"""+ht.data_day_str+"""'
AND end_date > '"""+ht.data_day_str+"""'
AND skuid % 10 = """+mod_number+"""
)t0
)t1
WHERE rn = 1
) range
ON main.promt_id = range.promoid
;
"""
ht.exec_sql(schema_name = 'tmp', table_name = tb_name, sql = sql, exec_engine = 'spark',spark_resource_level='high',retry_with_hive = False,spark_args=[
'--conf spark.executor.cores=3',
'--conf spark.executor.memory=30g',
'--conf spark.dynamicAllocation.maxExecutors=2000',
'--conf spark.sql.shuffle.partitions=8000',
'--hiveconf hive.exec.orc.split.strategy=ETL',
'--hiveconf mapred.max.split.size=268435456',
'--hiveconf mapred.min.split.size.per.node=268435456',
'--hiveconf mapred.min.split.size.per.rack=268435456'] )

临时表合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.getenv('HIVE_TASK'))
from HiveTask import HiveTask

ht = HiveTask()
dt = ht.oneday(0)
dt_int = ht.oneday(0,'')
dt_1_int = ht.oneday(-1,'')
db_name = 'gdm'
tb_name = 'gdm_m07_promt_sku_info'

sql = """
use """+db_name+""";
INSERT overwrite table """+db_name+"""."""+tb_name+""" partition (dt)
SELECT * FROM tmp.promt_sku_0_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_1_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_2_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_3_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_4_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_5_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_6_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_7_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_8_"""+dt_int+"""
UNION ALL
SELECT * FROM tmp.promt_sku_9_"""+dt_int+"""
;
DROP TABLE tmp.promt_sku_0_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_1_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_2_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_3_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_4_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_5_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_6_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_7_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_8_"""+dt_1_int+""";
DROP TABLE tmp.promt_sku_9_"""+dt_1_int+""";
"""
ht.exec_sql(schema_name = db_name, table_name = tb_name, sql = sql, exec_engine = 'spark',spark_resource_level='high',retry_with_hive = False,spark_args=[
'--conf spark.executor.cores=3',
'--conf spark.executor.memory=30g',
'--conf spark.dynamicAllocation.maxExecutors=2000',
'--conf spark.sql.shuffle.partitions=8000',
'--hiveconf hive.exec.orc.split.strategy=ETL',
'--hiveconf mapred.max.split.size=268435456',
'--hiveconf mapred.min.split.size.per.node=268435456',
'--hiveconf mapred.min.split.size.per.rack=268435456'] )