HiveSQL实战积累_窗口函数

HiveSQL实战积累_窗口函数

1.窗口函数基本概念

1.1 窗口函数概述

窗口函数能够使用一行或多行的值来返回每一行的值,出现在select子句的表达式列表中。over是关键字,用来指定函数执行的窗口范围,over关键字中包含三个分析子句:分组(partition by)、排序(order by)和Frame窗口区间。

如上图所示,将窗口函数与group by进行比较:group by就是使用聚合函数是将多条记录聚合为1条,窗口函数则是使用函数对圈定窗口中的数据进行计算然后得到计算值返回给每一行,不会改变原本的行数。

窗口函数表达式如下:

1
2
3
4
5
6
7
8
SELECT
函数名() over([partition by <要分组的列>] [order by <要排序的列>] [Frame<窗口区间>])

over关键字中每个部分都是可选的,eg:
SELECT sum(value) over()
SELECT sum() over(partition by <要分组的列>)
SELECT sum() over(order by <要排序的列>)
SELECT sum() over(partition by <要分组的列> order by <要排序的列>)

窗口函数支持的函数类型:

  1. 聚合函数:sum(…)、 count(…)、max(…)、min(…)、avg(…)
  2. 排序函数:rank( )、dense_rank( )、row_number( )
  3. 偏移分析函数:lead(…)、lag(…)、 first_value(…)、last_value(…)

over关键字:

  1. partition by:分组的字段
  2. order by:排序的字段(默认升序ASC)
  3. Frame:窗口区间,用于指定计算数据的窗口边界,支持rows、range两种模式

1.2 Frame窗口区间原理

按partition by分组字段相同的key得到的所有数据行即为窗口函数的窗口,可以进一步使用Frame窗口区间来对这个窗口中的数据进行划界,指定函数作用的数据范围。

窗口示意图如下:

其中各名词的含义如下:

1
2
3
4
5
preceding              往前
following 往后
current row 当前行
unbounded preceding 从前面的起点
unbounded following 到后面的终点

Frame窗口区间的两种写法是rows/range between <数据范围>,如下:

1
2
3
4
rows/range between unbounded preceding and unbounded following --取窗口中的所有行,这是不加order by排序字段时的默认计算范围
rows/range between unbounded preceding and current row --取本行和之前所有行,这是加order by排序字段时的默认计算范围
rows/range between 2 preceding and current row --取当前行和前面2行
rows/range between 3 preceding and 1 following --取前边3行+当前行+后边1行,共5行

rows/range两种模式的区别:rows模式按物理行来进行划分,判断依据是行数;range模式按数值逻辑来进行行划分,要使用range模式的前提是有order by排序字段。如果例子很容易就可以看出区别:

1
2
3
4
5
6
7
8
SELECT
bs,
mins,
item_pv,
SUM(item_pv) over(order by mins rows BETWEEN unbounded preceding AND CURRENT row) AS rows,
SUM(item_pv) over(order by mins range BETWEEN unbounded preceding AND CURRENT row) AS range
FROM
app.app_d14_traffic_test_ftx

2.窗口函数分类

2.1 聚合函数

  1. count(*):计算目标表中的所有行,包括null值。
  2. count(col):计算特定列或表达式中非null值的行数。
  3. sum(col):返回值的表达式总和,忽略null值。
  4. max(col)/min(col):返回输入表达式值中的最大/小值,忽略null值。
  5. avg(col):返回输入表达式值的平均值,忽略null值。

应用实例sql如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
select 
grades
,subjects
,results
,sum(results) over(partition by grades,subjects order by results desc) as sum聚合1
,sum(results) over(partition by grades,subjects) as sum聚合2
,count(results) over(partition by grades,subjects order by results desc) as count聚合1
,count(results) over(partition by grades,subjects) as count聚合2
,min(results) over(partition by grades,subjects order by results desc) as min聚合1
,min(results) over(partition by grades,subjects) as min聚合2
,max(results) over(partition by grades,subjects order by results desc) as max聚合1
,max(results) over(partition by grades,subjects) as max聚合2
,avg(results) over(partition by grades,subjects order by results desc) as avg聚合1
,avg(results) over(partition by grades,subjects) as avg聚合2
from test11

其中聚合1表示将数据集按照grades、subjects进行分组后,按照results降序排序,将每组中的results依次聚合;聚合2表示将数据集按照grades、subjects进行分组后,将每组中的results整体聚合。其中聚合1与聚合2的差异来源于order by排序字段的默认Frame窗口区间,有order by的默认窗口区间是rows between unbounded preceding and current row,没有order by的默认窗口区间是rows between unbounded preceding and unbounded following。也可以在聚合1的over关键字最后加上窗口区间rows between unbounded preceding and unbounded following,使得聚合1与聚合2的计算结果相同。

2.2 排序函数

  1. rank( ):bigint类型,形如1,2,2,4…(序号可以重复,序号不连续),排名,占用下一名次的位置。
  2. dense_rank( ):bigint类型,形如:1,2,2,3…(序号可以重复,序号连续),排名,不占用下一名次的位置。
  3. row_number( ):bigint类型,形如:1,2,3,4…(序号不重复,序号连续),编号。
  4. cume_dist():double类型,分组内小于等于当前rank的行值/分组内的总行数(查询<=当前计算值的比例),重复值取重复值最后一行位置(0,1]。
  5. percent_rank():double类型,返回数据集中每个数据的排名百分比,可以用来计算超过了百分之多少的人,排名计算公式为:(当前行的rank值-1)/(分组内的总行数-1),重复值取重复值第一行位置[0,1]。
  6. ntile(n):bigint类型,将每个窗口分区的数据分散到桶号从1到n的n个桶中。

应用实例sql如下:

1
2
3
4
5
6
7
8
9
select 
grades
,subjects
,results
,row_number() over(partition by grades,subjects order by results desc) as row_numbers
,rank() over(partition by grades,subjects order by results desc) as ranks
,dense_rank() over(partition by grades,subjects order by results desc) as dense_ranks
,ntile(3) over(partition by grades,subjects order by results desc) as ntiles
from test11

2.3 偏移分析函数

  1. lead(col,n,m):返回当前行的后n行,lead(要取的列, 往下取n行(可选,默认为1),如果没有下一行默认null)。
  2. lag(col,n,m):返回当前行的前n行,lead(要取的列, 往上取n行(可选,默认为1),如果没有上一行默认null)。
  3. first_value():取分组内排序后,截止到当前行的第一个值,默认窗口区间为rows between unbounded preceding and current row。
  4. last_value():取分组内排序后,截止到当前行的最后一个值,默认窗口区间为rows between unbounded preceding and current row。

应用实例sql如下:

1
2
3
4
5
6
7
8
9
select 
grades
,subjects
,results
,lag(results,1,0) over(partition by grades,subjects order by results desc) as lag移位1
,lead(results,1,0) over(partition by grades,subjects order by results desc) as lead移位1
,first_value(results) over(partition by grades,subjects order by results desc) as first_value排序1
,last_value(results) over(partition by grades,subjects order by results desc) as last_value排序1
from test11

加order by排序字段代表将数据集按照grades、subjects进行分组后,再根据results降序排序,然后以默认窗口区间rows between unbounded preceding and current row进行运算。若不加order by则是对分组后的数据直接运算,也就是默认窗口区间rows between unbounded preceding and unbounded following。

3.partitionBy&distributeBy区别

3.1 order by全局排序

order by会对数据进行一次全局排序,只要hivesql中指定了order by,那么最后所有的数据都会到同一个reducer进行排序处理,所以数据量特别大的时候效率非常低。

3.2 sort by局部排序

sort by在每个reducer端都会做排序,为每个reduce产生一个排序文件,也就是说sort by能保证局部有序,就是每个reducer出来的数据是有序的,但是不能保证所有的数据是有序的,除非只有一个reducer。使用sort by的好处是,执行了局部排序之后可以为接下去的全局排序提高不少的效率。

3.3 distribute by分区

distribute by是控制map的输出在reducer是如何划分的,可以控制某个特定数据行应该到哪个reducer。默认情况下采集hash算法,将map端输出数据中hash值相同的结果分发到同一个reducer上。

distribute by经常和sort by配合使用。

3.4 group by

group by和distribute by类似 都是按key值划分数据到不同reduce进行处理。唯一不同的是,distribute by只是单纯的分散数据;而group by是为了把相同key的数据聚集到一起后续必须是聚合操作。

3.5 cluster by

cluster by 除了distribute by 的功能外,还会对该字段进行排序。所以distribute by和 sort by合用且by相同字段就相当于cluster by,但是cluster by不能指定排序为asc或 desc的规则,只能是升序排列。

3.6 distribute by与partition by的区别

partition by不能用在where后面,partition by只能与order by配合在窗口函数中使用,而distribute by在窗口函数和where后面都可以使用。在窗口函数中partition by [key..] order by [key..]和distribute by [key…] sort by [key…]两者没有任何区别。

4.分时累积预计算实战

4.1 数据加工路径

实时离线对比页面或者单日趋势图页面经常需要用到分时累计数据,此处以单日10分钟累计流量指标pv、uv预计算为例,展示窗口函数在分时累计预计算中的使用,加工路径如下图所示:

图中的时间断点是指,某些时间区间内没有pv或uv,导致采用窗口函数累加得到的数据表中,没有该时间维度行;uv断点是指某些时间区间内有pv但是没有uv,导致累加完之后的两指标合并之后,该时间维度行的uv为0或空。

4.2 指标分时统计再窗口函数累加

sql实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
SELECT
dt,
bs,
interval_index,
SUM(item_pv_10min_aggr) AS item_pv_10min_aggr,
SUM(item_uv_10min_aggr) AS item_uv_10min_aggr,
0 AS item_rt_10min_aggr,
0 AS item_login_users_10min_aggr
FROM
(
SELECT
dt,
bs,
interval_index,
SUM(item_pv_10min_aggr) over(partition BY dt, bs order by interval_index) AS item_pv_10min_aggr,
0 AS item_uv_10min_aggr
FROM
(
SELECT
dt,
arrayJoin(IF(bs = '7710', ['7710', '77'], [bs])) AS bs,
concat(REGEXP_REPLACE(dt, '-', ''), leftPad(toString(toInt32(toInt32(mins) / 6)), 2, '0'), leftPad(toString(10 *((mins) %6)), 2, '0')) AS interval_index,
SUM(item_pv) item_pv_10min_aggr
FROM
app.app_d14_traffic_plat_item_di_new_opt_aggr_d
WHERE
dt >= '2021-10-31'
AND dt <= '2021-10-31'
AND bs = '1302'
AND cate_bu_id = '6210'
AND mins <= 76
GROUP BY
dt,
bs,
interval_index
)
t

UNION ALL

SELECT
dt,
bs,
interval_index,
0 AS item_pv_10min_aggr,
SUM(item_uv_10min_aggr) over(partition BY dt, bs order by interval_index) AS item_uv_10min_aggr
FROM
(
SELECT
dt,
bs,
interval_index,
COUNT( *) AS item_uv_10min_aggr
FROM
(
SELECT
dt,
arrayJoin(IF(bs = '7710', ['7710', '77'], [bs])) AS bs,
bs_browser_uniq_id,
MIN(concat(REGEXP_REPLACE(dt, '-', ''), leftPad(toString(toInt32(toInt32(mins) / 6)), 2, '0'), leftPad(toString(10 *((mins) %6)), 2, '0'))) AS interval_index
FROM
app.app_d14_traffic_plat_item_di_new_opt_aggr_d
WHERE
dt >= '2021-10-31'
AND dt <= '2021-10-31'
AND bs = '1302'
AND cate_bu_id = '6210'
AND mins <= 76
GROUP BY
dt,
bs,
bs_browser_uniq_id
)
t
GROUP BY
dt,
bs,
interval_index
)
tt
)
ttt
GROUP BY
dt,
bs,
interval_index

分别使用窗口函数累加得到的pv和uv如上图所示,明显可以看出存在时间断点,且pv数据条数比uv数据条数要多,将两个指标合并之后,可以看出存在uv断点,如下图所示。

4.3 join时间维表补全断点

sql实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
SELECT
dt,
bs,
concat(REGEXP_REPLACE(dt, '-', ''), leftPad(toString(toInt32(toInt32(mins) / 6)),2,'0'), leftPad(toString(10 *((mins) %6)),2,'0')) as interval_index,
MAX(item_pv_10min_aggr) as item_pv_10min_aggr,
MAX(item_uv_10min_aggr) as item_uv_10min_aggr,
0 as item_rt_10min_aggr,
0 as item_login_users_10min_aggr
FROM
(
SELECT
g_mins as mins,
d_mins
from
app.app_full_aggr_dim_d) aggr_dim
join (
select
dt,
bs,
interval_index,
SUM(item_pv_10min_aggr) as item_pv_10min_aggr,
SUM(item_uv_10min_aggr) as item_uv_10min_aggr,
0 as item_rt_10min_aggr,
0 as item_login_users_10min_aggr
from
(
select
dt,
bs,
interval_index,
sum(item_pv_10min_aggr) over(partition by dt,
bs
order by
interval_index) as item_pv_10min_aggr,
0 as item_uv_10min_aggr
from
(
select
dt,
arrayJoin(if(bs = '7710',['7710','77'],[bs])) as bs,
mins as interval_index,
sum(item_pv) item_pv_10min_aggr
from
app.app_d14_traffic_plat_item_di_new_opt_aggr_d
WHERE
dt >= '2021-10-31'
AND dt <= '2021-10-31'
AND bs = '1302'
AND cate_bu_id = '6210'
GROUP BY
dt,
bs,
interval_index) t
UNION ALL
select
dt,
bs,
interval_index,
0 as item_pv_10min_aggr,
sum(item_uv_10min_aggr) over(partition by dt,
bs
order by
interval_index) as item_uv_10min_aggr
from
(
select
dt,
bs,
interval_index,
count(*) as item_uv_10min_aggr
from
(
select
dt,
arrayJoin(if(bs = '7710',['7710','77'],[bs])) as bs,
bs_browser_uniq_id,
min(mins) as interval_index
from
app.app_d14_traffic_plat_item_di_new_opt_aggr_d
WHERE
dt >= '2021-10-31'
AND dt <= '2021-10-31'
AND bs = '1302'
AND cate_bu_id = '6210'
GROUP BY
dt,
bs,
bs_browser_uniq_id) t
group by
dt,
bs,
interval_index)tt)ttt
group by
dt,
bs,
interval_index) aggr on
aggr_dim.d_mins = aggr.interval_index
group by
dt,
bs,
interval_index

此处通过join时间维表得到笛卡尔积将数据爆炸开来,其中时间维表存储的数据格式类似如下表:

g_mins d_mins
0 0
1 0
1 1
2 0
2 1
2 2
3 0
3 1
3 2
3 3

运行sql补全时间和uv断点,得到计算结果如下图所示:

5.串行窗口函数优化实战

5.1 单个select中多个窗口函数执行顺序

对于窗口函数,优化器能做的优化有限,首先会把窗口函数从project中抽取出来,成为一个独立的算子称之为window,窗口函数的窗口数据定义由over关键字中的partition by字段和order by字段共同决定。当一个select语句中包含多个窗口函数,它们的数据窗口可能相同,也可能不相同,只有分区和排序都一样才是相同的数据窗口。对于窗口相同的窗口函数可以在同一个window算子中执行,对于窗口的窗口函数,优化器会将它们分成不同的window算子,每次执行之前都要重新分区和排序,且这些window算子必须串行执行,如下图所示。

5.2 串行窗口函数优化实例

以如下使用窗口函数计算两个排名的sql为例,通过explain执行计划可以看出两个窗口函数是顺序执行的,可想而知当窗口函数数量增多,计算时间也会正比例增长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
explain
SELECT
item_third_cate_cd,
item_sku_id,
terminal_type,
trade_type,
main_brand_id,
price_band_id,
row_number() over(distribute BY terminal_type, price_band_id, trade_type, item_third_cate_cd sort by uv DESC, pv DESC) AS uv_sku_rank,
row_number() over(distribute BY terminal_type, price_band_id, trade_type, item_third_cate_cd, main_brand_id sort by uv DESC, pv DESC) AS uv_sku_brand_rnak
FROM
adm.adm_zh_industry_sku_index_all_ref
WHERE
dt = '2023-03-03'
AND stat_ct_cd = 'day';

执行计划如下,两个window算子分别在satge-1和stage-2中,这两个stage为父子关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
Explain
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: adm_zh_industry_sku_index_all_ref
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((dt = '2023-03-03') and (stat_ct_cd = 'day')) (type: boolean)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: terminal_type (type: int), price_band_id (type: string), trade_type (type: string), item_third_cate_cd (type: string), uv (type: bigint), pv (type: bigint)
sort order: ++++--
Map-reduce partition columns: terminal_type (type: int), price_band_id (type: string), trade_type (type: string), item_third_cate_cd (type: string)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
value expressions: item_sku_id (type: bigint), main_brand_id (type: string)
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey3 (type: string), VALUE._col5 (type: bigint), KEY.reducesinkkey0 (type: int), KEY.reducesinkkey2 (type: string), VALUE._col9 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey5 (type: bigint), KEY.reducesinkkey4 (type: bigint)
outputColumnNames: _col4, _col6, _col8, _col9, _col12, _col17, _col19, _col20
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col4: string, _col6: bigint, _col8: int, _col9: string, _col12: string, _col17: string, _col19: bigint, _col20: bigint
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col20(DESC), _col19(DESC)
partition by: _col8, _col17, _col9, _col4
raw input shape:
window functions:
window function definition
alias: row_number_window_0
name: row_number
window function: GenericUDAFRowNumberEvaluator
window frame: PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col12 (type: string), _col17 (type: string), _col19 (type: bigint), _col20 (type: bigint), _col4 (type: string), _col6 (type: bigint), _col8 (type: int), _col9 (type: string), row_number_window_0 (type: int)
outputColumnNames: _col12, _col17, _col19, _col20, _col4, _col6, _col8, _col9, row_number_window_0
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col8 (type: int), _col17 (type: string), _col9 (type: string), _col4 (type: string), _col12 (type: string), _col20 (type: bigint), _col19 (type: bigint)
sort order: +++++--
Map-reduce partition columns: _col8 (type: int), _col17 (type: string), _col9 (type: string), _col4 (type: string), _col12 (type: string)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
value expressions: row_number_window_0 (type: int), _col6 (type: bigint)
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: int), KEY.reducesinkkey3 (type: string), VALUE._col6 (type: bigint), KEY.reducesinkkey0 (type: int), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey4 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey6 (type: bigint), KEY.reducesinkkey5 (type: bigint)
outputColumnNames: _col0, _col5, _col7, _col9, _col10, _col13, _col18, _col20, _col21
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col0: int, _col5: string, _col7: bigint, _col9: int, _col10: string, _col13: string, _col18: string, _col20: bigint, _col21: bigint
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col21(DESC), _col20(DESC)
partition by: _col9, _col18, _col10, _col5, _col13
raw input shape:
window functions:
window function definition
alias: row_number_window_1
name: row_number
window function: GenericUDAFRowNumberEvaluator
window frame: PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col5 (type: string), _col7 (type: bigint), _col9 (type: int), _col10 (type: string), _col13 (type: string), _col18 (type: string), _col0 (type: int), row_number_window_1 (type: int)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

使用union all+sum() group by对上述sql进行改造如下,则union all上下的两个select可以并发执行,实现了window算子的并发执行。这种方法的并发度更高,执行速度更快,但是对计算资源的要求也更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
explain
SELECT
item_third_cate_cd, --三级类目id
item_sku_id, --商品编码
terminal_type, --渠道
trade_type, --经营模式
main_brand_id, --主品牌id
price_band_id, --价格带id
SUM(uv_sku_rank) AS uv_sku_rank, --流量uv排序-三级类目下所有sku
SUM(uv_sku_brand_rnak) AS uv_sku_brand_rnak --流量uv排序-三级类目下主品牌内sku排序
FROM
(
SELECT
item_third_cate_cd,
item_sku_id,
terminal_type,
trade_type,
main_brand_id,
price_band_id,
row_number() over(distribute BY terminal_type, price_band_id, trade_type, item_third_cate_cd sort by uv DESC, pv DESC) AS uv_sku_rank,
0 AS uv_sku_brand_rnak
FROM
adm.adm_zh_industry_sku_index_all_ref
WHERE
dt = '2023-03-03'
AND stat_ct_cd = 'day'

UNION ALL

SELECT
item_third_cate_cd,
item_sku_id,
terminal_type,
trade_type,
main_brand_id,
price_band_id,
0 AS uv_sku_rank,
row_number() over(distribute BY terminal_type, price_band_id, trade_type, item_third_cate_cd, main_brand_id sort by uv DESC, pv DESC) AS uv_sku_brand_rnak
FROM
adm.adm_zh_industry_sku_index_all_ref
WHERE
dt = '2023-03-03'
AND stat_ct_cd = 'day'
)
t
GROUP BY
item_third_cate_cd,
item_sku_id,
terminal_type,
trade_type,
main_brand_id,
price_band_id;

执行计划如下,两个window算子分别在stage-1和stage-3中,这两个stage并发执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
Explain
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1, Stage-3
Stage-3 is a root stage
Stage-0 depends on stages: Stage-2

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: adm_zh_industry_sku_index_all_ref
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((dt = '2023-03-03') and (stat_ct_cd = 'day')) (type: boolean)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: terminal_type (type: int), price_band_id (type: string), trade_type (type: string), item_third_cate_cd (type: string), uv (type: bigint), pv (type: bigint)
sort order: ++++--
Map-reduce partition columns: terminal_type (type: int), price_band_id (type: string), trade_type (type: string), item_third_cate_cd (type: string)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
value expressions: item_sku_id (type: bigint), main_brand_id (type: string)
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey3 (type: string), VALUE._col5 (type: bigint), KEY.reducesinkkey0 (type: int), KEY.reducesinkkey2 (type: string), VALUE._col9 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey5 (type: bigint), KEY.reducesinkkey4 (type: bigint)
outputColumnNames: _col4, _col6, _col8, _col9, _col12, _col17, _col19, _col20
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col4: string, _col6: bigint, _col8: int, _col9: string, _col12: string, _col17: string, _col19: bigint, _col20: bigint
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col20(DESC), _col19(DESC)
partition by: _col8, _col17, _col9, _col4
raw input shape:
window functions:
window function definition
alias: row_number_window_0
name: row_number
window function: GenericUDAFRowNumberEvaluator
window frame: PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col4 (type: string), _col6 (type: bigint), _col8 (type: int), _col9 (type: string), _col12 (type: string), _col17 (type: string), row_number_window_0 (type: int), 0 (type: int)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Union
Statistics: Num rows: 35463386 Data size: 15178329326 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: sum(_col6), sum(_col7)
keys: _col0 (type: string), _col1 (type: bigint), _col2 (type: int), _col3 (type: string), _col4 (type: string), _col5 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 35463386 Data size: 15178329326 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: bigint), _col2 (type: int), _col3 (type: string), _col4 (type: string), _col5 (type: string)
sort order: ++++++
Map-reduce partition columns: _col0 (type: string), _col1 (type: bigint), _col2 (type: int), _col3 (type: string), _col4 (type: string), _col5 (type: string)
Statistics: Num rows: 35463386 Data size: 15178329326 Basic stats: COMPLETE Column stats: NONE
value expressions: _col6 (type: bigint), _col7 (type: bigint)
TableScan
Union
Statistics: Num rows: 35463386 Data size: 15178329326 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: sum(_col6), sum(_col7)
keys: _col0 (type: string), _col1 (type: bigint), _col2 (type: int), _col3 (type: string), _col4 (type: string), _col5 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 35463386 Data size: 15178329326 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: bigint), _col2 (type: int), _col3 (type: string), _col4 (type: string), _col5 (type: string)
sort order: ++++++
Map-reduce partition columns: _col0 (type: string), _col1 (type: bigint), _col2 (type: int), _col3 (type: string), _col4 (type: string), _col5 (type: string)
Statistics: Num rows: 35463386 Data size: 15178329326 Basic stats: COMPLETE Column stats: NONE
value expressions: _col6 (type: bigint), _col7 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0), sum(VALUE._col1)
keys: KEY._col0 (type: string), KEY._col1 (type: bigint), KEY._col2 (type: int), KEY._col3 (type: string), KEY._col4 (type: string), KEY._col5 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: adm_zh_industry_sku_index_all_ref
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((dt = '2023-03-03') and (stat_ct_cd = 'day')) (type: boolean)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: terminal_type (type: int), price_band_id (type: string), trade_type (type: string), item_third_cate_cd (type: string), main_brand_id (type: string), uv (type: bigint), pv (type: bigint)
sort order: +++++--
Map-reduce partition columns: terminal_type (type: int), price_band_id (type: string), trade_type (type: string), item_third_cate_cd (type: string), main_brand_id (type: string)
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
value expressions: item_sku_id (type: bigint)
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey3 (type: string), VALUE._col5 (type: bigint), KEY.reducesinkkey0 (type: int), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey4 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey6 (type: bigint), KEY.reducesinkkey5 (type: bigint)
outputColumnNames: _col4, _col6, _col8, _col9, _col12, _col17, _col19, _col20
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col4: string, _col6: bigint, _col8: int, _col9: string, _col12: string, _col17: string, _col19: bigint, _col20: bigint
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col20(DESC), _col19(DESC)
partition by: _col8, _col17, _col9, _col4, _col12
raw input shape:
window functions:
window function definition
alias: row_number_window_0
name: row_number
window function: GenericUDAFRowNumberEvaluator
window frame: PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col4 (type: string), _col6 (type: bigint), _col8 (type: int), _col9 (type: string), _col12 (type: string), _col17 (type: string), 0 (type: int), row_number_window_0 (type: int)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 17731693 Data size: 7589164663 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

参考文献

SQL函数-窗口函数

order by、distribute by、sort by、group by、partition by的区别