SELECT t1.a, t2.b FROM t1 JOIN t2 ON t1.id = t2.id;

Join在MaxCompute内部紧张有三种实现方法:

Broadcast Hash Join - 当Join存在一个很小的表时,我们会采取这种办法,即把小表广播通报到所有的Join Task Instance上面,然后直接和大表做Hash Join。

computehashphpMaxCompute Hash Clustering介绍 CSS

Shuffle Hash Join - 如果Join表比较大,我们就不能直接广播了。
这时候,我么可以把两个表按照Join Key做Hash Shuffle,由于相同的键值Hash结果也是一样的,这就担保了相同的Key的记录会网络到同一个Join Task Instance上面。
然后,每个Instance对数据量小的一起建Hash表,数据量大的顺序读取Join。

Sort Merge Join - 如果Join的表更大一些,#2的方法也用不了,由于内存已经不敷以容纳建立一个Hash Table。
这时我们的实现方法是,先按照Join Key做Hash Shuffle,然后再按照Join Key做排序,末了我们对Join双方做一个归并,详细流程如下图所示:

实际上对付MaxCompute本日的数据量和规模,我们绝大多数情形下都是利用的Sort Merge Join,但这实在是非常昂贵的操作。
从上图可以看到,Shuffle的时候须要一次打算,并且中间结果须要落盘,后续Reducer读取的时候,又须要读取和排序的过程。
对付M个Mapper和R个Reducer的场景,我们将产生M x R次的IO读取。
对应的Fuxi物理实行操持如下所示,须要两个Mapper Stage,一个Join Stage,个中赤色部分为Shuffle和Sort操作:

与此同时,我们不雅观察到,有些Join是可能反复发生的,比如上面的Query改成了:

SELECT t1.c, t2.d FROM t1 JOIN t2 ON t1.id = t2.id;

虽然,我们选择的列不一样了,但是底下的Join是完备一样的,全体Shuffle和Sort的过程也是完备一样的。

又或者:

SELECT t1.c, t3.d FROM t1 JOIN t3 ON t1.id = t3.id;

这个时候是t1和t3来Join,但实际上对付t1而言,全体Shuffle和Sort过程还是完备一样。

于是,我们考虑,如果我们初始表数据天生时,按照Hash Shuffle和Sort的办法存储,那么后续查询中将避免对数据的再次Shuffle和Sort。
这样做的好处是,虽然建表时付出了一次性的代价,却节省了将来可能产生的反复的Shuffle和Join。
这时Join的Fuxi物理实行操持变成了如下所示,不仅节省了Shuffle和Sort的操作,并且查询从3个Stage变成了1个Stage完成:

以是,总结来说,Hash Clustering通过许可用户在建表时设置表的Shuffle和Sort属性,进而MaxCompute根据数据已有的存储特性,优化实行操持,提高效率,节省资源花费。

功能描述功能开关目前Hash Clustering功能已经全面上线,缺省条件下即打开支持。
但是,如果须要利用clustered index,须要加上一下flag:

set MaxCompute.sql.cfile2.enable.read.write.index.flag=true;

这个flag打开后,将对排序后的Hash Bucket自动建立Index,提高查询效率。
如果希望利用index功能,则在建表和后续查询中都加上这个flag。
如果希望在project中一贯利用index,请与我们联系,我们可以把一个project default setting打开。

Clustered Index对付在排序键上的查询(等值或者范围)有显著帮助,但是纵然没有enable这个flag,仍旧可以享受到Hash Clustering其他性能提升的好处。

创建Hash Clustering Table

用户可以利用以下语句创建Hash Clustering表。
用户须要指定Cluster Key(即Hash Key),以及Hash分片(我们称之为Bucket)的数目。
Sort是可以选项,但在大多数情形下,建媾和Cluster Key同等,以便取得最佳的优化效果。

CREATE TABLE [IF NOT EXISTS] table_name

[(col_name data_type [comment col_comment], ...)][comment table_comment][PARTITIONED BY (col_name data_type [comment col_comment], ...)]

[CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETS]

[AS select_statement]

举个例子如下:

CREATE TABLE T1 (a string, b string, c bigint) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;

如果是分区表,则可以用这样的语句创建:

CREATE TABLE T1 (a string, b string, c bigint) PARTITIONED BY (dt string) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;

CLUSTERED BY

CLUSTERED BY指定Hash Key,MaxCompute将对指定列进行Hash运算,按照Hash值分散到各个Bucket里面。
为避免数据倾斜,避免热点,取得较好的并行实行效果,CLUSTERED BY列适宜选择取值范围大,重复键值少的列。
此外,为了达到Join优化的目的,也该当考虑选取常用的Join/Aggregation Key,即类似于传统数据库中的主键。

SORTED BY

SORTED BY子句用于指定在Bucket内字段的排序办法,建议Sorted By和Clustered By同等,以取得较好的性能。
此外,当SORTED BY子句指定之后,MaxCompute将自动天生索引,并且在查询的时候利用索引来加快实行。

INTO number_of_buckets BUCKETS

INTO ... BUCKETS 指定了哈希桶的数目,这个数字必须供应,但用户该当由数据量大小来决定。
Bucket越多并发度越大,Job整体运行韶光越短,但同时如果Bucket太多的话,可能导致小文件太多,其余并发度过高也会造成CPU韶光的增加。
目前推举设置让每个Bucket数据大小在500MB - 1GB之间,如果是特殊大的表,这个数值可以再大点。

目前,MaxCompute只能在Bucket Number完备同等的情形下去掉Shuffle步骤,我们下一个发布,会支持Bucket的对齐,也便是说存在Bucket倍数关系的表,也可以做Shuffle Remove。
为了将来可以较好的利用这个功能,我们建议Bucket Number选用2的N次方,如512,1024,2048,最大不超过4096,否则影响性能以及资源利用。

对付Join优化的场景,两个表的Join要去掉Shuffle和Sort步骤,哀求哈希桶数目同等。
如果按照上述原则打算两个表的哈希桶数不一致,怎么办呢?这时候建议统一利用数字大的Bucket Number,这样可以担保合理的并发度和实行效率。
如果表的大小实在是相差太远,那么Bucket Number设置,可以采取倍数关系,比如1024和256,这样将来我们进一步支持哈希桶的自动分裂和合并时,也可以利用数据特性进行优化。

变动表属性

对付分区表,我们支持通过ALTER TABLE语句,来增加或者去除Hash Clustering属性:

ALTER TABLE table_name

[CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETS

ALTER TABLE table_name NOT CLUSTERED;

关于ALTER TABLE,有几点须要把稳:

alter table改变聚拢属性,只对付分区表有效,非分区表一旦聚拢属性建立就无法改变。

alter table只会影响分区表的新建分区(包括insert overwrite天生的),新分区将按新的聚拢属性存储,老的数据分区保持不变。

由于alter table只影响新分区,以是该语句不可以再指定PARTITION

ALTER TABLE语句适用于存量表,在增加了新的聚拢属性之后,新的分区将做hash cluster存储。

表属性显示验证

在创建Hash Clustering Table之后,可以通过:

DESC EXTENDED table_name;

来查看表属性,Clustering属性将显示在Extended Info里面,如下图所示:

对付分区表,除了可以利用以上命令查看Table属性之后,于是须要通过以下命令查看分区的属性:

DESC EXTENDED table_name partition(pt_spec);

例如:

Hash Clustering的其他优点Bucket Pruning和Index优化

考虑以下查询:

CREATE TABLE t1 (id bigint, a string, b string) CLUSTERED BY (id) SORTED BY (id) into 1000 BUCKETS;

...

SELECT t1.a, t1.b, t1.c FROM t1 WHERE t1.id=12345;

对付普通表,这个常日意味着全表扫描操作,如果表非常大的情形下,资源花费量是非常可不雅观的。
但是,由于我们已经对id做Hash Shuffle,并且对id做排序,我们的查询可以大大简化:

通过查询值\公众12345\"大众找到对应的Hash Bucket,这时候我们只须要在1个Bucket里面扫描,而不是全部1000个。
我们称之为“Bucket Pruning”。

由于Bucket内数据按ID排序存放,MaxCompute会自动创建Index,利用Index loopup直接定位到干系记录。

可以看出来,我们不仅大大减少了Mapper的个数,并且由于利用了Index,Mapper可以直接定位到数据所在Page,加载读入的数据量也大大的减少了。

以下是安全部基于User ID查询场景的一个例子。
下面这个logview是普通的表的查询操作,可以看到,由于数据量很大,一共起了1111个Mapper,读取了427亿条记录,末了找符合条件记录26条,统共耗时1分48秒:

同样的数据,同样的查询,用Hash Clustering表来做,我们可以直接定位到单个Bucket,并利用Index只读取包含查询数据的Page,可以看到这里只用了4个Mapper,读取了10000条记录,统共耗时只须要6秒,如果用service mode这个韶光还会更短:

Aggregation优化

例如,对付以下查询:

SELECT department, SUM(salary) FROM employee GROUP BY (department);

在常日情形下,我们会对department进行Shuffle和Sort,然后做Stream Aggregate,统计每一个department group。
但是如果表数据已经CLUSTERED BY (department) SORTED BY (department),那么这个Shuffle和Sort的操作,也就相应节省掉了。

存储优化

即便我们不考虑以上所述的各种打算上的优化,单单是把表Shuffle并排序存储,都会对付存储空间节省上有很大帮助。
由于MaxCompute底层利用列存储,通过排序,键值相同或附近的记录存放到一起,对付压缩,编码都会更加友好,从而使得压缩效率更高。
在实际测试中,某些极度情形下,排序存储的表可以比无序表的存储空间节省50%。
对付生命周期很长的表,利用Hash Clustering存储,是一个很值得考虑的优化。

以下是一个大略的实验,利用100G TPC-H lineitem表,包含了int,double,string等多种数据类型,在数据和压缩办法等完备一样的情形下,hash clustering的表空间节省了~10%。

测试数据及剖析

对付Hash Clustering整体带来的性能收益,我们通过标准的TPC-H测试集进行衡量。
测试利用1T数据,统一利用500 Buckets,除了nation和region两个极小的表以外,别的所有表均按照第一个列作为Cluster和Sort Key。

整体测试结果表明,在利用了Hash Clustering之后,总CPU韶光减少17.3%,总的Job运行韶光减少12.8%。

详细各个Query CPU韶光比拟如下:

Job运行韶光比拟如下:

须要把稳到是TPC-H里并不是所有的Query都可以利用到Clustering属性,特殊是两个耗时最长的Query没有办法利用上,以是从总体上的效率提升并不是非常惊人。
但如果单看可以利用上Clustering属性的Query,收益还是非常明显的,比如Q4快了68%,Q12快了62%,Q10快了47%,等等。

以下是TPC-H Q4在普通表的Fuxi实行操持:

而下面则是利用Hash Clustering之后的实行操持,可以看到,这个DAG被大大的简化,这也是性能得到大幅提升的关键缘故原由:

功能限定及将来操持

目前Hash Clustering的第一阶段开拓事情完成,但还存在以下限定和不敷:

不支持insert into,只能通过insert overwrite来添加数据。
不支持小文件合并。
由于区域聚拢在切分的时候已经只管即便担保数据在各个bucket里面均匀分布,以是就不存在小文件的问题了。
而直接的文件merge将毁坏区域聚拢属性。
但是,我们仍旧支持通过merge和archive命令来改变表文件存储格式,以及RAID文件转换的功能。
不支持tunnel直接upload到range cluster表,由于tunnel上传数据是无序的。

作者:晋恒