数据开发之离线计算_join的原理与spark&ck中的join优化

数据开发之离线计算_join的原理与spark&ck中的join优化

一、常见join类型及其原理

1.单机join

常见的单机Join算法有四种:Nested-Loop Join (NL Join)、Block Nested-Loop Join、Sort Merge Join、Hash Join。

Nested-Loop Join

Nested-Loop Join嵌套循环join也就是建立内外两层for循环分别遍历左表和右表的每一行。假如左表m行,右表n行,则需要扫描和比较m*n次。在大数据时代,通常一张表数据量都是以亿为单位,如果使用Nested Loop Join算法,那么Join操作的比较次数直接就是天文数字了。所以Nested Loop Join基本上是作为万不得已的保底方案。

Block Nested-Loop Join

Block Nested-Loop Join是在Nested-Loop Join基础上的优化,将外层循环的数据表中数据按块批量加载到join buffer中,内层循环的每一行与整个buffer中的记录做比较,从而减少内层循环的次数。假如左表m行,右表n行,如果每次读取左表的10行放入buffer中,则需要扫描和比较m*n/10次。

Sort Merge Join

Nested Loop Join算法的关键问题在于比较次数过多,算法的复杂度为O(m*n)。如果集合中的元素是有序的,比较的次数会大幅度降低,避免很多无意义的比较运算。

通过将JOIN操作拆分成Sort和Merge两个阶段实现Join操作的加速。对于Sort阶段,是可以提前准备好可以复用的。这样的思想对于MySQL这类关系型数据库是非常友好的,这也能解释阿里巴巴开发手册中要求关联的字段必须建立索引,因为索引保证了数据有序。该算法时间复杂度为排序开销O(m*log(m)+n*log(n))+合并开销O(m+n)。但是通常由于索引保证了数据有序,索引其时间复杂度为O(m+n)。

Hash Join

Hash join的实现分为build table(也就是基于小表数据建立hashMap)和probe table,首先依次读取小表的数据,对于每一行数据根据关联键生成一个hashMap中的一个键值对,数据缓存在内存中,如果内存放不下需要dump到磁盘。依次扫描外表拿到每一行数据根据关联键生成hash key获取hashMap中对应的键值对。

2.分布式join

常见的分布式Join算法有四种:shuffle join、broadcast join、co-located join、pre-computed join。

shuffle join

shuffle join根据联接键值在节点之间重新分配来自两个表的行,这样具有相同联接键值的所有行都将移动到相同的节点。根据单机join的算法,shuffle连接可以是shuffle hash join、 shuffle sort-merge join等等。

broadcast join

广播联接将最小的表的全量数据复制移动到每个节点上,然后直接进行单机join即可。根据单机join的算法,shuffle连接可以是broadcast hash join、 broadcast sort-merge join等等。

co-located join

co-located join是指两张表的数据已经根据关联键按相同的分片算法散列在了不同节点中,直接单机join即可。

pre-computed join

pre-computed join是指两张表已经通过预计算的形式关联在一起并散列在不同节点中了。

spark常见join形式

1.broadcast hash join

spark中的broadcast hash join常用于大表与小表join场景,原理如第一节介绍,就是将小表广播到每个执行器后,构建hash table,通过大表关联字段作为key,从hash table映射即可判断是否有匹配。相当于遍历一次大表分区即可,而且分布式运行,性能较好。

1)在scala程序或者sparksql中没有加hint的情况下,只要满足如下条件,就会自动使用BHJ:

  • 默认BHJ是打开的,只要没有手动将spark.sql.autoBroadcastJoinThreshold参数设置为-1即可使用;

  • 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,如果没有配置,则默认是10M;

  • 被广播的表不能是基表,比如left outer join时,只能广播右表。

2)在加了hint的情况下,只要是等值连接(除full outer join),基本上都会产生broadcast join,不管参数autoBroadcastJoinThreshold是否配置。

如下添加了hint的scala程序:

1
2
3
4
5
6
7
import org.apache.spark.sql.functions.broadcast

val largeDF = // 一些大的 DataFrame
val smallDF = // 一些小的 DataFrame

// 告诉 Spark 对 smallDF 使用 broadcast join
val joinedDF = largeDF.join(broadcast(smallDF), Seq("id"))

如下添加了hint的sparksql:

1
2
3
4
-- 使用广播提示,提示Spark执行broadcast join
SELECT /*+ BROADCAST(smallTable) */ *
FROM largeTable
JOIN smallTable ON largeTable.id = smallTable.id;

2.Shuffle Hash Join

spark中的Shuffle Hash Join常用于大表与小表join场景,但是无法使用broadcast hash join的场景。原理如第一节介绍,就是将左右两表按相同的分区算法和分区数进行分区,然后在单机join。

1)在scala程序或者sparksql中没有加hint的情况下,需要满足如下条件,才会使用SHJ:

  • Shuffle Sort Merge Join手动关闭了,否则默认会先走Shuffle Sort Merge Join;
  • 小表的平均分区小于小于spark.sql.autoBroadcastJoinThreshold所配置的值,可以构建一个hash map。

2)在加了hint的情况下,则会优先考虑使用Shuffle Hash Join。

1
2
3
4
-- 使用shuffle hash join提示
SELECT /*+ SHUFFLE_HASH(largeTable) */ *
FROM largeTable
JOIN anotherLargeTable ON largeTable.id = anotherLargeTable.id;

3.Shuffle Sort Merge Join

spark中的Shuffle Sort Merge Join是Spark默认的join方式,可以通过参数spark.sql.join.preferSortMergeJoin配置开关,默认是true。原理如第一节介绍,就是对两张表参与Join的Keys使用相同的分区算法和分区数进行分区,保证hash值一样的不同表数据,并且在每个分区内排序,在下阶段都分发到同一个分区中,进行Merge Join。

1)在没有自动识别为broadcast hash join时,默认会使用sort merge join。

2)在加了hint的情况下,则会优先考虑使用Shuffle Sort Merge Joinn。

1
2
3
4
-- 使用shuffle hash join提示
SELECT /*+ SHUFFLE_HASH(largeTable) */ *
FROM largeTable
JOIN anotherLargeTable ON largeTable.id = anotherLargeTable.id;

4.Broadcast Nested Loop Join

在出现非等值连接时(not in)会使用此方式,本质是嵌套循环比较,效率较低。

参考文献

spark join 及优化

三、clickhouse常规join

1.单机join

ck中的本地join方式分为hash join和merge Join。引擎会自动优先使用hash join,当内存达到一定阈值后再使用 merge join,优先满足性能要求。

Hash Join

右表全部数据加载到内存,在内存构建hash table,key为joinkey;再从左表分批读取数据,与右表的hash table匹配数据。

Merge join

右表排序,内部block切分,超出内存部分flush到磁盘上,内存大小通过参数设定;再将左表基于block排序,按照每个block依次与右表merge。

从上也可以看出ck中的join都是将右表数据放入内存构建hash table,所以使用ck进行join一定要保持小表放在右边的习惯。

2.分布式join

ck中的分布式join主要有global join和普通的join。默认就是普通join,需要用户显式指定才会使用global join。global join主要是对分布式表的关联优化,不会影响关联结果正确性。

如下是之前记录的global in和普通in的区别,global join与普通join的区别也是一样的原理:


有一个查询成交用户榜单top10的页面,同时还要看这10个用户的其他指标,为了提高查询性能,先查出成交金额top10用户,再计算其他指标,sql如下,这是一个非常典型的使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SELECT
userID,
uniq(order_id) AS ordQtty
FROM
distributed_table1
WHERE
userID IN
(
SELECT
userID,
SUM(amt) AS dealAmt
FROM
distributed_table1
GROUP BY
userID
ORDER BY
dealamt limit 10
)
GROUP BY
userID

上述sql简化一下:

1
SELECT uniq(UserID) FROM distributed_table1 WHERE UserID IN (SELECT UserID FROM distributed_table2)

该命令首先会将distributed_table1分布式表替换为local_table1本地表下发到每个节点:

1
SELECT uniq(UserID) FROM local_table1 WHERE UserID IN (SELECT UserID FROM distributed_table2)

然后每个节点在执行where子集时,又会把子查询部分做一次转换下发给每一个节点:

1
SELECT UserID FROM local_table2

如果集群有n个节点,那么该sql就会有n*n次请求和数据传输,效率非常低。

global in用于解决该问题,会将查询分布式表的子查询语句先执行,然后将查询结果分发给所有节点放入内存中作为临时表,再执行外层查询,这样总请求数就变成了n*2。

global in使用时需要注意以下几点:

  • 临时表存储在内存中时,存储有限,在子查询中尽量做好去重;
  • 当使用globa join时,同理会先在请求者服务器运行一个子查询来计算右表并将其结果作为内存临时表,所以应尽量将小表放在右表。

看下来就可以发现,无论是global join还是普通join,其实都是broadcast join的放大版,ck并未实现真正意思上的Shuffle Join和Broadcast Join。

3.ck的join优化思路

避免Join

1)生成大宽表:数据预生成(由Spark/Flink或者Clickhouse物化视图产出数据),形成大宽表,基于单表的查询是 ClickHouse最为擅长的场景。

2)尽量使用IN代替JOIN:JOIN 需要基于内存构建hash table且需要存储右表全部的数据,然后再去匹配左表的数据。而IN查询会对右表的全部数据构建hash set,但是不需要匹配左表的数据,且不需要回写数据到block。

更快的Join

需要join的两表数据预先按照关联键分片。比如黄金眼的刷岗就是这么做的,将订单明细和sku维表都按sku进行shard分片,然后在本地表上按sku进行join刷新订单表中的商品岗位等维度信息。

更少的数据

不论是分布式JOIN还是本地JOIN,都需要尽量让少的数据参与JOIN,既能提升查询速度也能减少资源消耗,比如优化SQL下推等措施。

参考文献

ClickHouse Join为什么被大家诟病?

四、clickhouse的进化join

随着clickhouse的不断更新迭代,clickhouse的hash join也在不断进化,除了基本的算法种类,也衍生出了Parallel hash join和Grace hash join。

其中Parallel hash join以增加内存开销为代价,提高join并行度,提升join效率,但是会有OOM的风险;在这个基础之上又衍生出Grace hash join,当内存不足时,hash table会被溢出保存到磁盘上。这两种join方式的使用都需要在cksql尾部添加setting参数来指定,如SETTINGS join_algorithm = 'parallel_hash'SETTINGS join_algorithm = ``'grace_hash'

这篇博客中对这几个变种join做了非常详细的原理介绍和实验展示,非常值得一看ClickHouse中的Hash Join, Parallel Hash Join, Grace Hash Join)