HiveSQL实战积累_hiveUDF原理与使用

HiveSQL实战积累_hiveUDF原理与使用

1.Hive的简单实现方法

简单实现方法步骤:

1.新建maven项目,在pom.xml文件中添加hiveUDF依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>

2.新建UDF实现类继承UDF类,实现evaluate抽象方法:

1
2
3
4
5
6
7
8
9
package hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class UDFHello extends UDF {
public String evaluate(String name){
return "hello"+name;
}
}

3.打成jar包上传至hive的lib路径下,并赋权:

1
[peizk@hadoop lib]$ chmod 777 UdfTest-1.0.jar

4.进入hive客户端,执行添加jar包命令:

1
hive (default)> add jar /home/peizk/app/hive-3.1.2/lib/UdfTest-1.0.jar;

添加之后可以使用list jars查看添加的jar,确认是否添加成功:

1
2
hive (default)> list jars;
/home/peizk/app/hive-3.1.2/lib/UdfTest-1.0.jar

5.在hive客户端执行创建临时函数命令:

1
hive (default)> create temporary function say_hello as "hive.udf.UDFHello";

6.在hiveSQL或者sparkSQL中使用该udf:

1
2
3
4
5
hive (default)> select say_hello('peizk');
OK
_c0
hellopeizk
Time taken: 1.657 seconds, Fetched: 1 row(s)

上述简单实现方法中,只是创建了临时函数,一旦退出hive客户端,再次进入hive客户端就没法儿再次使用该UDF函数了。如果想要一次添加多次使用可以通过两种方法实现:1.UDF函数注册到元数据,2.编译服务端源码。

2.商智常见使用方式

通常使用脚本的形式固化日常生产任务,一个脚本中就是一个日期动态的sql,那么直接将UDF jar包和生产脚本打包在一起,然后在生产脚本的sql中添加jar包并创建临时函数,然后使用UDF方法。

如下案例中编写了一个将真实数据进行指数化的UDF函数,用于进行真实数据脱敏,但是又能让下游进行趋势分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python3

import os,sys
from HiveTask import HiveTask
ht = HiveTask()

sql = """
use app;
set hive.map.aggr = true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
SET hive.exec.max.dynamic.partitions=100000;
SET hive.exec.max.dynamic.partitions.pernode=100000;
set mapred.output.compress=true;
set hive.exec.compress.output=true;
set mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec ;
set hive.mapjoin.smalltable.filesize=250000000;
set io.compression.codecs=com.hadoop.compression.lzo.LzopCodec ;
set hive.merge.smallfiles.avgsize=134217728;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;

set mapreduce.map.memory.mb=8192;
set mapreduce.map.java.opts=-Xmx6144M;
set mapreduce.map.cpu.vcores = 2;
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx6144M;
set mapreduce.reduce.cpu.vcores = 4;
set yarn.app.mapreduce.am.resource.cpu-vcores = 3;
set yarn.app.mapreduce.am.resource.mb = 8192;
set yarn.app.mapreduce.am.command-opts = -Xmx6144m;
set mapreduce.task.io.sort.mb=1024;
set mapreduce.job.reduce.slowstart.completedmaps=1;

add jar common/hive.jar;
create temporary function tran_index as 'UDF.TranIndex';

INSERT overwrite table app.app_zh_industry_market_trend_trans_ref_new partition (dt = '"""+ ht.data_day_str + """' ,stat_ct_cd = 'day')
select
item_second_cate_cd --二级类目编码
,item_third_cate_cd --三级类目编码
,999999 as price_band_id --价格带 默认给之前老价格带中的全部编码
,case when trade_type='B2C' then '1'
when trade_type='POP' then '0'
when trade_type='ALL' then '2' end as trade_type --经营模式
,terminal_type --渠道
,stat_ct as stat_ct --yyyymmdd
,price_band as price_band_name --价格带名称
,pv as pv--浏览量
,tran_index(uv,15.37) as uv_index --访客指数
,search_num as search_num --搜索次数
,coalesce(search_click_num,0) as search_click_num --搜索点击次数
,case when coalesce(search_num,0) <> 0
then coalesce(search_click_num,0)/coalesce(search_num,0)
else 0 end as search_click_rate --搜索点击率
,add_cart_user_count --加购人数
,focus_user_count --关注人数
,deal_sale_qtty as deal_sale_qtty --成交商品件数
,tran_index(deal_sale_amount,11.97) as deal_sale_amount_index --成交金额指数
,case when coalesce(deal_user_count,0) <> 0
then coalesce(deal_sale_amount,0)/deal_user_count
else 0 end as customer_unit_price --成交客单价
,case when coalesce(uv,0) <> 0
then coalesce(deal_user_count,0)/uv
else 0 end as deal_trans_rate --成交转化率
,visited_sku_count --被访问商品数
,brand_count --品牌数
,visited_brand_count --被访问品牌数
,deal_brand_count --动销品牌数
,tran_index(pv,17.37) as pv_index--浏览量指数
,tran_index(search_num,21.97) as search_num_index --搜索次数指数
,tran_index(search_click_num,21.97) as search_click_num_index --搜索点击次数指数
,tran_index(deal_sale_qtty,15.28) as deal_sale_qtty_index --成交商品件数指数
,uv --访客数
,sale_ord_id_num --成交单量
,deal_sale_amount --成交金额
,deal_user_count --成交人数
,shop_count --店铺数
,visited_shop_count --被访问店铺数
,deal_shop_count --动销店铺数
,deal_sku_count --动销商品数

from adm.adm_zh_industry_item_index_all
where dt = '"""+ ht.data_day_str + """'
and stat_ct_cd = 'day'
and item_third_cate_cd <> ''
and item_third_cate_cd is not null
;
"""
#ht.exec_sql(schema_name = 'app' , sql = sql )
ht.exec_sql(schema_name = 'app', sql = sql,table_name = "app_zh_industry_market_trend_trans_ref_new" ,merge_flag = True, merge_part_dir = ['dt=' + ht.data_day_str+'/stat_ct_cd=day'], merge_type='mr')

在sql逻辑前面根据jar在脚本中的路径添加jar包,并创建临时函数:add jar common/hive.jar; create temporary function tran_index as 'UDF.TranIndex';,然后就可以在后续sql中使用该临时函数tran_index(pv,17.37) as pv_index

3.参考文献

Hive的UDF实现两种简单方法+通过编译源码添加UDF