ClickHouse_MergeTree&ReplicatedMergeTree&Distributed原理解析

ClickHouse_MergeTree&ReplicatedMergeTree&Distributed原理解析

1.MergeTree引擎

1.1 建表

ck在建表时需要指定几个关键信息:

  • order by(排序键):可以是一组列名的组合,用于数据片段内的数据排序。例如排序键是(CounterID,Date),则片段内数据首先按照CounterID排序,具有相同CounterID的数据行按照Date排序。
  • partition by(分区键):一般是一个列名,用于将不同的数据行按照分区键进行隔离存储,分区键相同的行存储在同一个分区中。
  • primary key(主键):可以是一组列名的组合,用于生成稀疏索引。
  • SETTINGS index_granularity = 8192:稀疏索引粒度,也就是每8192行数据生成一个一级索引值。

我们以一个排序好的数据片段为例,如下图:

MergeTree引擎的特点就是每次插入数据时,数据会以有序数据片段的形式写入磁盘,但是不同的数据片段之间的数据并不是有序的。ck后台会定期合并同属于一个分区的不同数据片段,使得整个分区的数据存储在一起并有序。

我们在建表时,可以不设置partition key,这样的话一张本地表相当于只有一个分区,每次插入数据之后全表合并重新排序。我们一般使用日期字段作为partition key。

我们建表时,一定要设置order key,用于数据片段内数据存储排序。可以不设置primary key,则自动以order key为primary key;如果设置了primary key,则必须保证primary key与order key的前缀,比如primary key为(a,b),order key必须为(a,b,%)。

就以上图为例,图中是以(CounterID,Date)为order key和primary key。如果以(CounterID,Date)为order key,以(CounterID)为primary key,则图中组成一级索引的标记就只由CounterID字段组成,应该是a,a,a,b,e,e,g,h,i,i,l。这样的话索引存储空间更小了,但是查询效率会降低。

1.2 数据合并过程

伴随着每次insert写入数据,MergeTree都会生成一批新的分区目录,每个分区目录对应一个有序数据片段。ck通过后台任务将属于相同分区的多个目录合并成一个新目录就是数据合并的过程,旧目录默认会在8分钟后通过后台任务删除。

以一个目录名称为例,介绍新目录的命名规则,如2022-10-01_1_3_1

2022-10-01:是partition key的值,就是分区名;

1:是MinBlockNum,表示该新目录合并的最小数据块的编号;

3:是MaxBlockNum,表示该新目录合并的最大数据块的编号;

1:是Level,表示合并的层级,也就是该分区被合并过的次数。

一个分区目录合并过程如下图所示:

分区目录中的文件作用如下图所示,一个表有多少个字段,就会有多少个[Column].bin数据文件,这也体现了列式存储特性:

primary.idx文件内存放采用稀疏索引实现的一级索引。稠密索引就是每一行索引标记对应一行具体的数据记录,稀疏索引就是每一行索引标记对应一段具体的数据记录。相较而言,稀疏索引占用存储空间较小,但是查询效率更低。

1.3 数据更新与删除

与hbase类似,ck也是只增数据库。ck中update与delete的本质是找到更新或删除数据所在的分区,然后新建该分区,使用新分区替换旧分区。

官方建议update/delete操作一次更新大量数据,因为更新的单位是分区。如果只更新一行数据,那么需要重建一次分区;如果一次更新100行数据,而这100行数据落在3个分区上,那么只需要重建3个分区。相较而言,一次更新一批数据的整体效率高于一次更新一行。

2.ReplicatedMergeTree引擎

2.1 建表

ReplicatedMergeTree引擎默认情况下会删除排序键值相同的重复项。

ReplicatedMergeTree引擎与Zookeeper配合,组成ck高可用的基础;ReplicatedMergeTree引擎与Distributed引擎配合,组成ck分布式的基础。

一个ReplicatedMergeTree建表语句案例如下:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE bc_online.ck_product_browselog_base_testjia on cluster default
(
`dateTime` String,
`pv` Float64,
`spuId` String
)
ENGINE = ReplicatedMergeTree('/ppzhck.jd.local/tables/bc_online/ck_product_browselog_base_testjia/{shard}', '{replica}')
PARTITION BY dateTime
ORDER BY (dateTime, spuId)
SETTINGS index_granularity = 8192;

重点关注ReplicatedMergeTree引擎建表时接受的两个参数:

  • zoo_path:Zookeeper中该表的路径
  • replica_name:Zookeeper中该表的副本名称

这两个参数中的分片名shard和副本名replica使用占位符宏替换即可,它们会被替换为ck配置文件里macros部分配置的值。

1
2
3
4
<macros>
<shard>01</shard>
<replica>01</replica>
</macros>

每一台ck服务器节点上的ck配置文件都是不一样的,使用shard和replica组合可以定位到唯一的一台ck服务器节点。

2.2 分片与副本信息查看

分片shard就是指包含数据不同部分的服务器,要读取所有数据,必须访问所有分片;副本replica是存储复制数据的服务器,要读取所有数据,访问任一副本上的数据即可。

使用如下sql可以查看ck集群的相关配置信息:

1
2
3
SELECT *
FROM system.clusters
WHERE cluster = 'LF0_CK_Pub_18';

得到查询结果:

cluster shard_num shard_weight replica_num host_name host_address port is_local user
LF0_CK_Pub_18 1 1 1 10.203.23.104 10.203.23.104 9600 0 default
LF0_CK_Pub_18 1 1 2 10.203.23.136 10.203.23.136 9600 0 default
LF0_CK_Pub_18 1 1 3 10.203.23.138 10.203.23.138 9600 0 default
LF0_CK_Pub_18 1 1 4 10.203.23.162 10.203.23.162 9600 0 default
LF0_CK_Pub_18 1 1 5 10.203.23.163 10.203.23.163 9600 0 default
LF0_CK_Pub_18 1 1 6 10.203.23.180 10.203.23.180 9600 0 default
LF0_CK_Pub_18 2 1 1 10.203.23.232 10.203.23.232 9600 0 default
LF0_CK_Pub_18 2 1 2 10.203.23.50 10.203.23.50 9600 0 default
LF0_CK_Pub_18 2 1 3 10.203.23.55 10.203.23.55 9600 0 default
LF0_CK_Pub_18 2 1 4 10.203.23.58 10.203.23.58 9600 0 default
LF0_CK_Pub_18 2 1 5 10.203.23.87 10.203.23.87 9600 0 default
LF0_CK_Pub_18 2 1 6 10.203.24.103 10.203.24.103 9600 0 default
LF0_CK_Pub_18 3 1 1 10.203.24.187 10.203.24.187 9600 0 default
LF0_CK_Pub_18 3 1 2 10.203.24.20 10.203.24.20 9600 0 default
LF0_CK_Pub_18 3 1 3 10.203.24.228 10.203.24.228 9600 0 default
LF0_CK_Pub_18 3 1 4 10.203.24.40 10.203.24.40 9600 0 default
LF0_CK_Pub_18 3 1 5 10.203.24.52 10.203.24.52 9600 0 default
LF0_CK_Pub_18 3 1 6 10.203.24.69 10.203.24.69 9600 0 default
LF0_CK_Pub_18 4 1 1 10.203.25.104 10.203.25.104 9600 0 default
LF0_CK_Pub_18 4 1 2 10.203.25.137 10.203.25.137 9600 0 default
LF0_CK_Pub_18 4 1 3 10.203.25.166 10.203.25.166 9600 0 default
LF0_CK_Pub_18 4 1 4 10.203.25.41 10.203.25.41 9600 0 default
LF0_CK_Pub_18 4 1 5 10.203.25.54 10.203.25.54 9600 0 default
LF0_CK_Pub_18 4 1 6 10.203.25.9 10.203.25.9 9600 0 default
LF0_CK_Pub_18 5 1 1 10.203.46.114 10.203.46.114 9600 0 default
LF0_CK_Pub_18 5 1 2 10.203.46.51 10.203.46.51 9600 0 default
LF0_CK_Pub_18 5 1 3 10.203.46.197 10.203.46.197 9600 0 default
LF0_CK_Pub_18 5 1 4 10.203.46.213 10.203.46.213 9600 0 default
LF0_CK_Pub_18 5 1 5 10.203.46.54 10.203.46.54 9600 0 default
LF0_CK_Pub_18 5 1 6 10.203.46.89 10.203.46.89 9600 0 default

从上表中可以看出,该LF0_CK_Pub_18集群上共配置了5个shard分片,每个分片具有6个replica副本。也就是说,在此LF0_CK_Pub_18集群上创建分布式表时,写入该表的数据会被分割成5个分片,每个分片上的数据是不一样的,合并起来才是全量数据;然后每个分片又有6个副本,这6个副本上的数据都是一模一样的。

2.3 主备同步

ReplicatedMergeTree引擎对Zookeeper的请求压力是非常的大。首先ck集群的副本节点是一个抢主逻辑,所有副本都可以公平写入,数据同步也是双向的。同一个分片的不同副本上任意一个副本上存在出现数据更新,都会将变化日志data part log注册到Zookeeper上,其他副本通过观察Zookeeper会异步的拷贝data part log来进行数据更新。

一行数据更新涉及到的与Zookeeper的交互就不下于10次,而且这种交互是数据分区粒度的,也就是说一次数据更新涉及到的数据分区有n的,那么ck集群与Zookeeper的交互次数就会是更新一个分区的n倍。那么同样的道理,使用ReplicatedMergeTree引擎进行数据写入时尽量使用batch写入并按照数据分区提前聚合,而不要一行一行的写入,否则对主备同步过程中对Zookeeper的请求压力非常大。

3.Distributed引擎

3.1 建表

Distributed引擎本身不存储数据,是一些本地物理表的分布式视图。

一个Distributed建表语句案例如下:

1
2
3
4
5
6
7
CREATE TABLE bc_online.ck_product_deal_detail_testjia_d on cluster default
(
`dateTime` String,
`pv` Float64,
`spuId` String
)
ENGINE = Distributed('default', 'bc_online', 'ck_product_deal_detail_testjia', rand());

Distributed引擎的参数为:集群名,数据库名,本地表名,数据分片键(可选)。

向分布式表中写数据有两种方法:

  1. 自行将数据进行分割,直接向每个shard分片上的本地表执行insert写入命令,使数据完全独立地写入不同的分片,这是最佳的写入方式,ck集群的网络开销最小。
  2. 将全量数据在分布式表上执行insert写入命令,在这种数据插入场景下,必须确保Distributed引擎配置了数据分片键,分布式表依据数据分片键的算法来计算每一行数据分发到哪个shard分片上。常见的数据分片键有随机rand()和哈希halfMD5hash(字段名)等。

平时我们用的就是自行分割数据分别写入本地表,具体实现另起一文介绍,本文先介绍一下写入分布式表实现原理。

3.2 写入分布式表

将数据写入ck分布式表的过程主要分为两步:分布式写入和副本同步。

分布式写入:客户端选择ck集群中的一台服务器建立连接,如节点1,那么所有的写入数据现在节点1完成写入。假设节点1的分片shard为1,副本replica为1,那么首先按照分片键定义的分区规则,计算每行写入数据所属分片shard。shard为1的数据直接写入节点1本地表,属于其他shard的数据先写入临时目录,然后向其余shard的某个主副本建立连接,将数据发送到对应shard。

副本同步:副本同步过程如上节所属主备同步过程。

参考文献

MergeTree引擎官方文档介绍

MergeTree原理解析

ClickHouse删改实现原理与优点总结

ClickHouse分布式表原理

ReplicatedMergeTree主备同步原理

Distributed分布式表写入流程