ClickHouse_BitMap在hive和ck中的存储与两者之间的传输

ClickHouse_BitMap在hive和ck中的存储与两者之间的传输

BitMap

bitmap(位图)是一种利用比特位来进行数据存储的结构,简单举例:存储1-8的整数,如果我们用整数数组的话,1个int型整数是4字节,至少需要4*8=32个字节的存储空间,但是如果用bitmap的话,我们只需要1个字节(8bit),从低位到高位,每一位是否为1即可表示该数是否存在。

假设有这样一个需求:在20亿个随机整数中找出某个数m是否存在其中,并假设32位操作系统,4G内存。如果每个数字用int存储,那就是20亿个int,因而占用的空间约为 (20000000004/1024/1024/1024)≈*7.45G;如果按位存储就不一样了,20亿个数就是20亿位,占用空间约为 (2000000000/8/1024/1024/1024)≈0.233**G。高下立判,无需多言。

显然,使用bitmap能够显著节省用户存储空间,但也有一些局限性:1.存储的数据不能过于稀疏,比如只有1和10000两个数,那也需要10000/8=1250个字节;不能表现数据是否重复,因为每一位只有0和1,只能表示该数存在或不存在。

正因为上述特性,经常有一些面试中会考到bitmap的使用:1.给你40亿个不重复的整数,判断其中是否存在某个给定的整数,但是只有1G的内存;10亿个整数中出现重复的整数个数;10亿个数中只有1位为空,在内存只有几十兆的情况下找出为空的那个数。

和bitmap原理类似的还有更复杂一点儿的布隆过滤器(BloomFilter)。

bitmap简介与快速增删改查原理

RoaringBitmap

RoaringBitmap中每个32位的整形,高16位会被作为key存储到char[] keys中(可以按short[] keys来理解),低16位则被看做value,存储到Container[] values中的对应Container中。keys和values通过下标一一对应。size则标示了当前RoaringArray中包含的key-value pair的数量,即keys和values中有效数据的数量。

keys数组永远保持有序,方便二分查找。

下面介绍到的是RoaringBitmap的核心,三种Container。通过上面的介绍我们知道,每个32位整形的高16位已经作为key存储在RoaringArray中了,那么Container只需要处理低16位的数据:

  • ArrayContainer可以看作一个有序数组;
  • RunContainer常用于存储较为连续的数据;
  • BitmapContainer就是利用Bitmap原理使用一个65536bit结构来存储数据,大小就是8kb。

RoaringBitmap会基于存储数据情况根据优化策略自动进行三种Container之间的转换。

RoaringBitmap数据结构及原理

BitMap与RoaringBitmap的区别

1.RoaringBitmap更加节省内存空间

如果要用BitMap存储两个元素,其中一个是1,另一个是1亿,那么这个BitMap需要的存储空间就是1亿bit,这样明显产生了大量的空间浪费。那么使用RoaringBitmap将每个元素的32位分成2^16个容器,只为用到的容器分配空间,解决了稀疏数据浪费空间的问题;同时还有三种Container根据稠密程度进行转化,节约容器占用存储。

2.RoaringBitmap性能更强

因为使用存储更少,计算时就不会开辟大量内存,提升计算速度;同时使用有序数组保存容器,进行增删改查等运算能根据二分法快速索引到目标元素。

Clickhouse中的bitmap类型创建方法

1.使用bitmapBuild将整形数组列物化为bitmap列

建表:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE ge_order.spark_bitmap_test_1
(
    `dt` LowCardinality(String) COMMENT '日期',
    `dim_type` Int32 COMMENT '维度类型',
    `dim_id` Int32 COMMENT '纬度值',
    `encode` Array(UInt32) COMMENT '编码',
    `compare_encode` AggregateFunction(groupBitmap, UInt32) MATERIALIZED bitmapBuild(encode) --物化列
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMMDD(toDate(dt))
PRIMARY KEY (dim_type, dim_id)
ORDER BY (dim_type, dim_id)
SETTINGS index_granularity = 8192

此处compare_encode即为通过bitmapBuild(encode)获得的bitmap列,推数时直接推前4列即可:

1
2
insert into ge_order.spark_bitmap_test_1 
values ('2021-12-14' , 1 , 2370 ,[3,4,100,200]);

2.使用groupBitmap将明细列聚合为bitmap列

建明细表:

1
2
3
4
5
6
{
dt string 时间
brand 品牌 手机品牌
phone_model 手机型号
device_id String 设备号
}

建bitmap表:

1
2
3
4
5
6
{
dt LowCardinality(String) 时间
dim_type LowCardinality(String) 维度类型(1:品牌维度、2:手机型号维度)
dim_value LowCardinality(String) 每个dim_type维度下对应的品牌或者手机型号值
bitmap_dvid AggregateFunction(groupBitmap, UInt32) 明细
}

插入数据:

1
2
3
4
5
6
7
8
9
INSERT INTO bitmap
select
dt ,
1 ,
brand ,
groupBitmapState(toUInt32(dvid)) as dvid
from 明细表
where dt = 日期
group by brand

bitmap使用实例之hive缩减数据量级

离线流量计算任务优化,这是一个在hive上使用bitmap的案例

1.背景

当前数据量巨大,导致一步到位的计算性能及其不稳定。采用预汇总方式减少数据量时需要保证后续去重指标的计算,所以只能以最细粒度进行预汇总,数据量减少效果微乎其微。为了分散计算压力,把整体的数据流维度横向、关联聚合过程纵向进行切割,每个调度任务承接一段计算任务。导致计算链路错综复杂,任务量巨大。

2.bitmap方案

主要利用bitmap两个优势:占用存储(内存)少、去重。

以UV为例:

  • 占用存储(内存)少:brws_uniq_id为string类型,平均20个字符,转化为bitmap后存储为原来的20 * 8 = 160分之一
  • 去重:将数据信息压缩到bit中,0代表该值不存在,1存在。将去重指标的数据信息进行了保留,支持了再次上卷时的去重计算。

3.具体实现

  • 对string类型的brws_uniq_id取hash作为bitmap的key值;
  • 优化稀疏存储问题采用了RoaingBitmap压缩位图,
  • 防止hash碰撞,为brws_uniq_id生成Long类型的hash值,所以最终采用了Roaing64Bitmap。
  • hive数仓中使用binary类型存储,在UDF/UDAF中将binary反序列化为Roaring64Bitmap,进行各种运算。如有需要再序列化为binary存储。

聚合上卷生产:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3

import sys
import os
sys.path.append(os.getenv('HIVE_TASK'))
from HiveTask import HiveTask
ht = HiveTask()


dest_db = 'app'
dest_table_name = 'app_sz_traffic_shop_src_log_shop_bit'

dt = sys.argv[1]

sql = """
add jar hdfs://ns3/user/mart_sz/bi_sz/util/hive-bitmap-udf.jar;
CREATE TEMPORARY FUNCTION to_bitmap AS 'com.hive.bitmap.udf.ToBitmapUDAF';
CREATE TEMPORARY FUNCTION bitmap_union AS 'com.hive.bitmap.udf.BitmapUnionUDAF';
CREATE TEMPORARY FUNCTION bitmap_count AS 'com.hive.bitmap.udf.BitmapCountUDF';
CREATE TEMPORARY FUNCTION bitmap_and AS 'com.hive.bitmap.udf.BitmapAndUDF';
CREATE TEMPORARY FUNCTION bitmap_or AS 'com.hive.bitmap.udf.BitmapOrUDF';
CREATE TEMPORARY FUNCTION bitmap_xor AS 'com.hive.bitmap.udf.BitmapXorUDF';

INSERT OVERWRITE TABLE {table} partition(dt = '{dt}')
SELECT
shop_id,
chan_cd,
SUM(upv) AS pv,
SUM(tm_on_page) AS tm_on_page,
to_bitmap(hash(brws_uniq_id)) AS brwsBit,
to_bitmap(IF(upv = 1, hash(brws_uniq_id), NULL)) AS bncBrwsBit,
to_bitmap(IF(upv > 1, hash(brws_uniq_id), NULL)) AS notBncBrwsBit
FROM
(
SELECT
shop_id,
chan_cd,
brws_uniq_id,
COUNT(1) AS upv,
SUM(
CASE
WHEN tm_on_page > 0
AND tm_on_page < 1800
THEN tm_on_page
ELSE 0
END) AS tm_on_page
FROM
adm.adm_s14_zs_all_chan_shop_traffics
WHERE
dt = '{dt}'
GROUP BY
shop_id,
chan_cd,
brws_uniq_id
)
ALL
GROUP BY
shop_id,
chan_cd
""".format(table = dest_table_name, dt = dt)

ht.exec_sql(schema_name = dest_db ,
table_name = dest_table_name ,
sql = sql ,
exec_engine='spark',
spark_resource_level='high',
merge_flag = True,
retry_with_hive=False )

预计算使用,bitmap数据结构上有交、并、异或、与非等各种运算函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
--
SELECT
汇总维度 * ,sum(累加指标) , countBitMap(unionBitMap(去重指标))
FROM
app_sz_traffic_shop_src_log_bit
WHERE
cust_type IN(1, 2)
GROUP BY
cust_type

--
SELECT
汇总维度 * , countBitMap(onceBitMap bncBrwsBit)
FROM
app_sz_traffic_shop_src_log_bit

4.实际收益

得益于bitmap的优势,可以把300w的brws_uniq_id信息压缩在一个字段中,并且支持后续汇总去重。所以以终为始的方式对数据进行深度预汇总,形成轻量级的预汇总层,将进入到应用层的数据量减少到可以合并计算的量级。

目前店铺+平台粒度已上线稳定运行,数据量从10亿+ 减少到100W左右,执行时长1h左右;店铺+平台+渠道粒度,数据量从10亿+减少到1000W级别,执行时长2h左右。

该链路上任务量从50+减少为5个,存储减少50%(150T左右),时效提前1h。

5.引入新问题

问题:数据倾斜

由于某个汇总维度数据量过大时会导致单节点计算压力过大,普遍意义是指数据行数倾斜。该bitmap方案中会把大量的数据信息进行压缩存储到一个字段中,所以就有可能当信息量过大时造成单条数据过大,引发数据大小倾斜的问题。例如:京东大药房(1000015441)2022.12月每日uv从不足百万增长到2千万,月度uv高达5千万,单条数据大于200K,导致单个task的suffleRead量到达1G,执行时长从平均10min延长到1h。

解决方案:数据分桶

在汇总计算任务前添加分桶层,通过分桶层控制数据流走向,将大桶数据单独计算。例如:在三级来源加工链路中,轻量级预汇总层和最终汇总层间添加分桶打标层,默认为1桶,如果shopId+chanCd+fristSrc粒度下月至今总UV大于5千万分打标为2桶。下游分为2条数据流,根据配置参数分别接受各桶数据。这样可以通过自定义的分桶规则和识别策略,不断提高整条数据链路的稳定性。

bitmap使用实例之hive提升计算性能

1.背景

在hive中使用Roaring64Bitmap实现精确去重功能 主要目的:
1.提升 hive 中精确去重性能,代替hive 中的 count(distinct uuid);
2.节省 hive 存储 ,使用 bitmap 对数据压缩 ,减少了存储成本;
3.提供在 hive 中 bitmap 的灵活运算 ,比如:交集、并集、差集运算 ,计算后的 bitmap 也可以直接写入 hive;

2.在hive中创建自定义 bitmap UDF

bitmap UDF github地址

UDF下载地址

1
2
3
4
5
6
7
8
add jar hdfs://node:9000/hive-bitmap-udf.jar;

CREATE TEMPORARY FUNCTION to_bitmap AS 'com.hive.bitmap.udf.ToBitmapUDAF';
CREATE TEMPORARY FUNCTION bitmap_union AS 'com.hive.bitmap.udf.BitmapUnionUDAF';
CREATE TEMPORARY FUNCTION bitmap_count AS 'com.hive.bitmap.udf.BitmapCountUDF';
CREATE TEMPORARY FUNCTION bitmap_and AS 'com.hive.bitmap.udf.BitmapAndUDF';
CREATE TEMPORARY FUNCTION bitmap_or AS 'com.hive.bitmap.udf.BitmapOrUDF';
CREATE TEMPORARY FUNCTION bitmap_xor AS 'com.hive.bitmap.udf.BitmapXorUDF';

3.udf说明

4.在 hive 中创建 bitmap 类型表,导入数据并查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE IF NOT EXISTS `hive_bitmap_table`
(
k int comment 'id',
bitmap binary comment 'bitmap'
) comment 'hive bitmap 类型表'
STORED AS ORC;

-- 数据写入
insert into table hive_bitmap_table select 1 as id,to_bitmap(1) as bitmap;
insert into table hive_bitmap_table select 2 as id,to_bitmap(2) as bitmap;

-- 查询
select bitmap_union(bitmap) from hive_bitmap_table;
select bitmap_count(bitmap_union(bitmap)) from hive_bitmap_table;

5.在 hive 中使用 bitmap 实现精确去重

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE IF NOT EXISTS `hive_table`
(
k int comment 'id',
uuid bigint comment '用户id'
) comment 'hive 普通类型表'
STORED AS ORC;

-- 普通查询(计算去重人数)
select count(distinct uuid) from hive_table;

-- bitmap查询(计算去重人数)
select bitmap_count(to_bitmap(uuid)) from hive_table;

bitmap使用实例之clickhouse提升计算性能

1.业务需求

在任意时间段, 求关注的人群基数(去重)。如果使用传统数仓按天分区明细做法,计算成本会非常高,而且无法做到在线查询出结果的。

2.创建业务宽表

1
2
3
4
5
6
{
dt string 时间
brand 品牌 手机品牌
phone_model 手机型号
device_id String 设备号
}

3.创建bitmap表

简单分析下需求,我们要取品牌、型号、天维度下的人群基数,那么建立如下可进行自由分析的bitmap表:

1
2
3
4
5
6
{
dt LowCardinality(String) 时间
dim_type LowCardinality(String) 维度类型(1:品牌维度、2:手机型号维度)
dim_value LowCardinality(String) 每个dim_type维度下对应的品牌或者手机型号值
bitmap_dvid AggregateFunction(groupBitmap, UInt32) 明细
}

4.写入数据

分别插入不同维度的bitmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
--删除分区
alter table bitmap的表 on cluster default_cluster drop partition 时间分区

--插入品牌维度数据
INSERT INTO bitmap
select
dt,
1,
brand,
groupBitmapState(toUInt32(dvid)) as dvid
from 宽表 where dt = 日期
group by brand

--插入型号维度数据
INSERT INTO bitmap
select
dt,
2,
phone_model,
groupBitmapState(toUInt32(dvid)) as dvid
from 宽表 where dt = 日期
group by phone_model

5.查询新增、留存等聚合数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
--查询iphone12品牌在当前周期(2021-07-01~2021-09-29)相比上周期(2021-05-01 ~ 2021-06-30)的留存人数
SELECT
bitmapCardinality( bitmapAnd
(
SELECT
groupBitmapOrState(devid_bmp)
FROM
(
SELECT
devid_bmp
FROM
bitmap
WHERE
dt >= '2021-07-01'
AND dt <= '2021-09-29'
AND dim_type = 1
AND dim_value = 'iphone12'
)
, -- 当期周期关注过iphone12的人
SELECT
groupBitmapOrState(devid_bmp)
FROM
(
SELECT
devid_bmp
FROM
bitmap
WHERE
dt >= '2021-05-01'
AND dt <= '2021-06-30'
AND dim_type = 1
AND dim_value = 'iphone12'
) -- 上周期关注过iphone12的人
)
)

基于clickhouse的bitmap实现任意时间段的去重求基数操作

hive中的binary推送到clickhouse的bitmap

是CK的bitmap在Spark.sql中没有对应的类型,要将hive表的binary推送到ck的bitmap中,就需要保证两个不同架构(spark-scala&CK-c++)的底层编码一致。

对应的底层结构分析参考文档:通过clickhouse源码了解hive/spark的roaringbitmap写入clickhouse的bitmap