HiveSQL实战积累_读取json数据

HiveSQL实战积累_读取json数据

我们进行ETL(Extract-Transfer-Load)过程中,经常会遇到从不同数据源获取的不同格式的数据,其中某些字段就是json格式字符串,里面拼接了很多字段key和指标值value。

1. get_json_object

get_json_object(string json_string, string path)方法的第一个参数填写字符串类型json对象变量,第二个参数使用$表示json变量标识,然后用.[]读取对象或数组,.[]可以相互嵌套。每次只能返回一个数据项,如果输入的json字符串无效,那么返回NULL。

案例1:获取单层值

1
2
3
select get_json_object('{"name":"大郎","sex":"男","age":25}','$.name');

-- 结果为:大郎

案例2:获取多层值

1
2
3
select get_json_object('{"name":"大郎","sex":"男","age":25,"score":{"math":30}}','$.score.math');

-- 结果为:30

案例3:获取数组中json对象

1
2
3
SELECT get_json_object('[{"name":"大郎","sex":"男","age":25},{"name":"西门庆","sex":"男","age":47}]',"$[1]");

-- 结果为:{"name":"西门庆","sex":"男","age":47}

案例4:获取数组中json对象的属性

1
2
3
SELECT get_json_object('[{"name":"大郎","sex":"男","age":25},{"name":"西门庆","sex":"男","age":47}]',"$[1].name");

-- 结果为:西门庆

2.json_tuple

json_tuple(json_string, k1, k2 …)可以指定多个json数据中的key,返回对应的value。一次可以解析多个json字段,如果输入的json字符串无效,那么返回NULL。

案例:

1
select json_tuple('{"user_name":"chimchim","age":30,"sex":"woman"}', 'user_name', 'age','sex')

3.使用嵌套子查询(explode+regexp_replace+split+json_tuple)解析json数组

1
2
3
4
5
6
7
8
9
select json_tuple(json, 'user_name', 'age', 'sex')
from (
select explode( --将json数组中的元素解析出来,转化为每行显示
split(regexp_replace(regexp_replace(
'[{"user_name":"chimchim","age":30,"sex":"woman"},{"user_name":"zonzon","age":2,"sex":"man"}]' --要解析的json内容
, '\\[|\\]', '') --将json数组两边的中括号去掉
,'\\}\\,\\{', '\\}\\;\\{') --将json数组元素之间的逗号换成分号
, '\\;')) --以分号作为分隔符(split函数以分号作为分隔)
as json) o;

explode()函数可以接收一个array或者map类型的数据作为输入,然后将array或map里面的元素按照每行的形式输出,即将hive一列中复杂的array或者map结构拆分成多行显示,也被称为单元格转行函数。

lateral view用于和split、explode等UDTF一起使用的,能将一行数据拆分成多行数据,在此基础上可以对拆分的数据进行聚合,lateral view首先为原始表的每行调用UDTF,UDTF会把一行拆分成一行或者多行,lateral view在把结果组合,产生一个支持别名表的虚拟表。

regexp_replace(string A, string B, string C)函数可以将字符串A中的符合java正则表达式B的部分替换为C。注意,在有些情况下要使用转义字符。

4.解析服务日志使用案例

解析数据服务日志,以此为基础分析请求命中的数据库、请求的引擎耗时等,用来支持数据加速。

进入服务的原始请求日志参数结构,也就是doFetchBizData invoke.日志的params案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
{
"dataassetdrive": {
"uReqData": {
"header": {
"context": {
"uuidUnique": "billionSeckillCms_BfqX3u",
"SERVICE_ID": "422",
"INDEX_FREQ": "REALTIME",
"RES_APPKEY": "billionSaleData"
},
"appkey": "seckillcms",
"uuid": "billionSeckillCms"
},
"body": {
"criteria": {
"orders": [],
"aliass": [],
"criterions": [
{
"op": "<=",
"propertyName": "dt",
"clzType": "SimpleExpression",
"type": "string",
"value": "2023-07-31"
},
{
"op": ">=",
"propertyName": "dt",
"clzType": "SimpleExpression",
"type": "string",
"value": "2023-07-29"
},
{
"op": ">=",
"propertyName": "jdr_sch_ord_deal_tm",
"clzType": "SimpleExpression",
"type": "string",
"value": "2023-07-29 19:23:23"
},
{
"op": "<=",
"propertyName": "jdr_sch_ord_deal_tm",
"clzType": "SimpleExpression",
"type": "string",
"value": "2023-07-31 19:23:23"
},
{
"op": "in",
"propertyName": "sku_id",
"values": [
"10063266095130",
"10063266095131",
"10063266095132",
"10063266095133",
"10063266095134",
"10063266095135",
"10063266095136",
"10063266095137",
"10063266095138",
"10063266095139",
"10063266095140",
"10063266095141",
"10065860562725",
"10065860562726",
"10065860562727",
"10065860562728",
"10065860562729",
"10065860562730",
"10065860562742",
"10065860562741",
"10065860562740",
"10065860562739",
"10065860562738",
"10065860562737"
],
"clzType": "InExpression",
"type": "string"
}
],
"group": [
"sku_id"
]
},
"attributes": [
"sku_id"
],
"indicators": [
"jdr_sch_deal_ord_sku_piece_tenbillionsubsidy"
]
}
}
}
}

请求命中easydata数据库并返回的日志参数结构,也就是EasyDataExternalService success executeSql%日志的params案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"dataassetdrive": {
"apiName": "ck_pl_bp_ckpub22Query",
"eTime": 1690762422116,
"jsfAlias": "prod_l0_million_shop",
"totalElements": 1,
"number": 0,
"size": 100,
"numberOfElements": 1,
"costTime": 4,
"requestId": "352a8eba-a07b-42f8-9769-b39b71bbaa31",
"apiGroupName": "one-service-driven",
"totalPages": 1,
"model": "page",
"executeBO": {
"cluster": "LF1_CK_Pub_22",
"clusterType": "ClickHouse",
"apiName": "ck_pl_bp_ckpub22Query",
"dbTable": "app_eco_sz_neg_ser_sum_dist",
"firstResult": 0,
"sql": "SELECT n sum(t.leave_msg_qtty) service_order_asmt_leave_msg_qtty, n sum(t.work_qtty) service_standard_task_ws_qtty, n sum(t.work_intime_nodis_qtty) jdr_eco_apply_afs_shop_qtty_eco_ser_nodis_ws_neg, n sum(t.consult_bad_qtty) service_order_oper_dis_satisfy_est_qtty, n sum(t.human_3m_receive_qtty) jdr_eco_ask_cus_shop_qtty_eco_ser_3min_neg, n sum(t.consult_very_bad_qtty) service_order_oper_highly_dis_satisfy_est_rate, n sum(t.sam_afs_bad_qtty) dissatisfaction_est_qtty, n sum(t.consult_qtty) service_order_asmt_consult_qtty, n sum(t.sam_afs_very_bad_qtty) highly_dissatisfaction_est_qtty, n sum(t.bef_complete_qtty) temp_complete_qtty nFROM service_board.app_eco_sz_neg_ser_sum_dist t -- 定义驱动 消极服务管控指标 nWHERE ( n t.stat_ct_cd = 'day' n AND t.dt >= '2023-07-01' n AND t.dt <= '2023-07-31' n AND (t.shop_id = '11681334') n)",
"tableName": "消极服务管控指标",
"existPre": true,
"maxResults": 100,
"apiGroupName": "one-service-driven",
"logicTableId": 1645,
"db": "service_board"
},
"sTime": 1690762422112,
"status": 200
}
}

解析sql案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
SELECT
req.uuid,
req.traceid,
req.pin,
req.profile_name,
req.data_freq,
req.time_interval,
req.indicators,
req.criterions,
req.groups,
req.attributes,
req.orders,
CAST(db.cost_time AS bigint) AS cost_time,
db.message,
db.req_start_time,
db.req_end_time,
db.cluster_type,
db.cluster_name,
db.database_1,
db.table_1,
db.table_name,
db.data_form,
CAST(db.logic_table_id AS bigint) AS logic_table_id,
db.lvlcode,
'' AS MODIFY,
db.sql,
req.params,
db.params
FROM
(
SELECT
MAX(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.body.indicators'), '^\\\\\\[|\\\\\\]$|\\\\\\"', '')) AS indicators,
MAX(tab1.criterions_property_name) AS criterions,
MAX(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.body.criteria.group'), '^\\\\\\[|\\\\\\]$|\\\\\\"', '')) AS groups,
MAX(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.body.attributes'), '^\\\\\\[|\\\\\\]$|\\\\\\"', '')) AS attributes,
MAX(tab1.orders_property_name) AS orders,
MAX(time_interval) AS time_interval,
MAX(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.header.context.PIN'), '^\\\\\\[|\\\\\\]$|\\\\\\"', '')) AS pin,
MAX(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.header.context.INDEX_FREQ'), '^\\\\\\[|\\\\\\]$|\\\\\\"', '')) AS data_freq,
tab1.params,
tab1.traceid,
tab1.uuid,
tab1.profilename AS profile_name
FROM
(
SELECT
concat_ws(',', collect_set(GET_JSON_OBJECT(criterionsjson, '$.propertyName'))) AS criterions_property_name,
NULL AS orders_property_name,
MAX(
CASE
WHEN GET_JSON_OBJECT(criterionsjson, '$.propertyName') = 'time_interval'
THEN GET_JSON_OBJECT(criterionsjson, '$.value')
ELSE NULL
END) AS time_interval,
params,
traceid,
uuid,
profilename
FROM
fdm.fdm_jdr_plat_bdaa_service_log_prod_v1_log_i t lateral VIEW explode(SPLIT(REGEXP_REPLACE(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.body.criteria.criterions'), '^\\\\\\[|\\\\\\]$|\\\\\\s', ''), '\\\\\\},\\\\\\{', '\\\\\\}\\\\\\^\\\\\\{'), '\\\\\\^')) v AS criterionsjson
WHERE
dt = '""" + ht.data_day_str + """'
AND doctype IN('dataassetdrive')
AND TRIM(MESSAGE) = 'doFetchBizData invoke.'
GROUP BY
params,
traceid,
uuid,
profilename

UNION ALL

SELECT
NULL AS criterions_property_name,
concat_ws(',', collect_set(GET_JSON_OBJECT(ordersjson, '$.propertyName'))) AS orders_property_name,
NULL AS time_interval,
params,
traceid,
uuid,
profilename
FROM
fdm.fdm_jdr_plat_bdaa_service_log_prod_v1_log_i t lateral VIEW explode(SPLIT(REGEXP_REPLACE(REGEXP_REPLACE(GET_JSON_OBJECT(params, '$.dataassetdrive.uReqData.body.criteria.orders'), '^\\\\\\[|\\\\\\]$|\\\\\\s', ''), '\\\\\\},\\\\\\{', '\\\\\\}\\\\\\^\\\\\\{'), '\\\\\\^')) v AS ordersjson
WHERE
dt = '""" + ht.data_day_str + """'
AND doctype IN('dataassetdrive')
AND TRIM(MESSAGE) = 'doFetchBizData invoke.'
GROUP BY
params,
traceid,
uuid,
profilename
)
tab1
GROUP BY
tab1.params,
tab1.traceid,
tab1.uuid,
tab1.profilename
)
req
LEFT JOIN
(
SELECT
params,
uuid,
traceid,
GET_JSON_OBJECT(params, '$.dataassetdrive.costTime') AS cost_time,
GET_JSON_OBJECT(params, '$.dataassetdrive.sTime') AS req_start_time,
GET_JSON_OBJECT(params, '$.dataassetdrive.status') AS MESSAGE,
GET_JSON_OBJECT(params, '$.dataassetdrive.eTime') AS req_end_time,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.cluster') AS cluster_name,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.clusterType') AS cluster_type,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.dbTable') AS table_1,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.existPre') AS data_form,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.logicTableId') AS logic_table_id,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.db') AS database_1,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.tableName') AS table_name,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.sql') AS SQL,
GET_JSON_OBJECT(params, '$.dataassetdrive.executeBO.lvlCode') AS lvlcode
FROM
fdm.fdm_jdr_plat_bdaa_service_log_prod_v1_log_i
WHERE
dt = '""" + ht.data_day_str + """'
AND doctype IN('dataassetdrive')
AND TRIM(MESSAGE) LIKE 'EasyDataExternalService success executeSql%'
)
db
ON
req.uuid = db.uuid
AND req.traceid = db.traceid