首页 >> 手游攻略

超大尺寸哔哩哔哩?在哔哩哔哩湖仓一体化平台中的实践

大家好,今天小编来为大家解答以下的问题,关于超大尺寸哔哩哔哩,在哔哩哔哩湖仓一体化平台中的实践这个很多人还不知道,现在让我们一起来看看吧!

导读今天给大家带来的分享是Trino在Bilibili湖仓一体的实践,

聚焦三个方面:

分享嘉宾|张明磊哔哩哔哩高级开发工程师

首先简单介绍一下整体的架构,如下图所示。

由于Trino在大部分情况下无法满足在毫秒内返回查询结果的需求,所以我们会建议对时延要求比较高的用户使用ElasticSearch/ClickHouse等时延小的组件。

我们支持的索引包括Min/Max、BloomFilter、BitMap、BloomRF(RangeFilter)、BITMAP、TOKEN-BloomFilter、TOKEN-BitMap索引。其中BloomRF支持范围查询并且可以有效减少存储空间,主要解决BitMap索引对于高基数字段存储空间较大的问题。

在日志场景下,我们提供了针对中文分词的TokenBloomFilter、TokenBitMap和英文分词的NgramBloomFilter、NGramBitMap索引。

值得注意的是,我们的索引分为轻量级和重量级。轻量级索引存储在Iceberg的manifest文件中,而重量级索引(如BitMap索引)存储在单独的索引文件里面,并且我们的索引文件和数据文件是一一对应的。

索引文件需要在计算引擎层面做适配进而在查询过程中跳过不需要读的文件,下面讲解如何实现。

Trino调度时首先会获取split并会将索引信息填充到split对象中。这样在下发到worker的时候,split中就已经包含有索引的相关信息了。在真正读取数据之前,算子会利用split的索引信息来判断用户输入的查询表达式是否真正需要读取该split。如果该split需要被读取,则无法跳过这个文件;如果不需要被读取,则直接返回EmptyPageSource。

我们对TrinoIceberg相关的UI也做了一些增强,并且索引是表级别的,由下图可以看到,使用索引来跳过数据文件分为两种,一种是在coordinator端跳过的(轻量级索引,如Min/Max索引和Bloomfilter索引),一种是在worker端跳过的(重量级索引,如BitMap索引)。

上图中的左上方的查询在使用Min/Max索引过滤文件时跳过了所有的数据文件,即在读取文件阶段时没有对任何文件进行读取。右下方的查询(group_id与左上方查询不同)中,在MinMax索引中无法跳过所有的数据文件,剩下的数据文件继续使用BitMap索引或BloomFilter索引进行过滤。

若join的左右两张表的数据表数量都很大,显然shufflejoin会对参与表的数据进行repartition,这样导致的网络开销会很大。另外多字段做aggregation计算量也很大。aggregation一般又分为两个阶段,partial和final。若是在aggregation的partial阶段没有预聚合效果的话,那么partial阶段就可以被省略。

关联列是针对Join场景做的优化。SSB(StarSchemaBenchmark)场景大部分查询的过滤条件在右表上,dynamicfilter生成左表joinkey的Min/Max值区间范围大,此时用joinkey做索引过滤效果较差。

在这种背景下我们引入了关联列。它是一个定义在事实表上的虚拟列且通过关联关系得到。我们可以基于关联列做数据组织优化并创建索引,计算引擎把基于关联列的filter下推到事实表进行过滤数据即可适配。

预计算并没有这样去使用关联列,只是使用了关联列的语法定义。对关联列的添加语法如下图所示。

三张表做join的原始查询如下,其中orderdate和regionkey是维度字段。

除了定义维度字段,还需要定义聚合值

我们拓展了Iceberg中的Action操作生成预计算文件

原始查询在集群上运行超过了SLA的时间限制。通过预计算处理之后,该查询可以在一秒钟之内跑出结果。

如果为每一种查询均生成CUBE,那么很容易造成维度上的存储爆炸。那么是否只生成一次CUBE就可以响应不同的查询。比如A?B?C生成的CUBE是否可以响应A?B或者B?C或者A?C。

为了解决这个问题,我们引入RecordPreservedJoin(RPJ)这个概念,对于表A和表B,如果A的每一条记录都出现在A?B的结果中,并且没有其他的记录,那么称该Join为RecordPreservedJoin。即join之后,表A内的数据在join结果中出现且仅出现一次。

可以发现左表的数据没有增加也没有减少,那么该join就是一个RPJ。RPJ对数据是有要求的,那么什么样的数据才能满足RecordPreservedJoin?通过对JoinKey做限制,这些限制包括了:

注意,RPJ是创建CUBE的非必要条件。如果Join不是RPJ,则单独为查询创建CUBE即可,但理论上是存在维度爆炸的风险。比如说,维度字段有10个,则有2的10次方的组合。理论上这么多的CUBE会耗费很大的存储空间。

预计算生成CUBE的阶段已经把Join和针对每个文件aggregation的工作完成了,所以在查询阶段就已经不需要额外的Join操作。下图是查询改写的过程,我们会对这个逻辑计划树进行Cube定义的匹配操作,若是能够匹配上,则将左侧等价地修改为右侧针对Cube的查询并会将读取数据文件修改为Cube的模式,不再读取原始数据。

要注意到Cube的生成是异步的,因此需要支持部分文件没有生成Cube的情况。这种情况下,会将执行计划修改为如下图所示。图中右侧是已经生成Cube,左侧是还未生成Cube的原始数据,对这些数据进行partialaggregation,然后对两侧的查询结果进行union操作就得到了最终的结果。

在Runtime阶段,我们需要注意在调度split时,要包含cube的信息(如文件路径),以保证worker的TableScanOperator能够从cubefile中读取文件,而不是从datafile中读取文件。显然,如果预聚合程度很高的话,聚合文件是非常小的。当聚合效果很好时,甚至可以将1GB的原始数据聚合为1KB的cubefile。由于读取的数据量小并且没有进行join操作,因此查询性能会非常高。

下图为预计算的性能测试结果。可见预计算场景下,性能会有5到40倍的提升;而在关联列场景下,性能会有1到5倍的提升。

3)如何利用Iceberg的统计信息来优化查询

这里主要有两部分的工作,第一部分是优化Tirno的Min/Max/Count函数,第二部分是优化Trino逻辑计划树上的Sort和TopN节点。

下图为Iceberg的Manifest文件存储的统计信息。其中lowe_bounds和upper_bounds存储了各列的最大和最小值;若文件已经按照某列进行了排序,则sort_order_id不为0,这时可以用该列来优化sort和topN节点。

显然,直接将datafile文件读入内存进行聚合是很耗时的。我们可以通过直接读取manifest文件内保存的min/max/count这些基本统计信息来避免datafile的读取。

对应地,逻辑计划树就可以由对datafile的tablescan和aggregation操作修改为对manifest的读取。

下图三张图表示了优化效果。可以看到,2200亿的数据量内性能提升相当明显。

第二个优化是基于日志场景的,一般典型的日志格式为:[2023?08?05T11:40:39][ip=192.168.1.1],XXX,YYY,ZZZ。包含有时间、IP、日志内容等信息。

下图为一个线上查询,可以看到这个查询通过_timestamp进行了排序。

以7月5号这一天的分区为例,统计信息如下:

针对这样一条查询,会将所有的日志数据载入内存,然后进行排序以获取topN。即使我们为timestamp、ip列创建索引以加速日志查询,有时还不足以能够满足用户期望的延时。尤其当数据量特别大的时候,TopN算子会进行大量的计算,导致性能低下,那么我们是否可以利用Iceberg统计信息对Sort、TopN算子进行优化。

我们下推了PartialSort。下图中的每一部分都是查询计划树的不同优化阶段。我们在Addlocalexchange之后改写了查询计划树。选择在Addlocalexchange之后的一个显而易见的原因是可以尽可能地利用优化后的查询计划树,在匹配到Exchange节点之后,我们把Exchange的Source节点,即PartialSortNode节点移除并且只在最终final阶段做一次归并排序就可以了。

TopNNode情况会稍微复杂一些,因为要考虑SortOrder方向以及NullOrdering的次序。有时查询的顺序(比如DESC)与用户定义SortOrder字段时的顺序(比如ASC)不一致。我们扩展了Trino的逻辑计划树,增加了一个LastNNode节点,对应的算子是LastNOperator。如果查询的顺序与定义的顺序一致,那么直接生成LimitNode节点即可。

其次是NullOrdering的处理,SQL2011标准对NullOrdering的次序没有做出明确的限制,具体的实现取决于不同的数据库软件。以Iceberg为例,如果按照ASC顺序写入,则默认的NullOrdering实现是NullFIRST。但是,Trino查询中默认的是NullLAST。另外,由于datafile文件已经排序,所以在某些场景下将数据文件再切分为split是一个不需要的优化。比如,前面的例子中,我们查询的是前200行,那么我们只需要读取49847个文件的前200行就可以了。若是为了提高并发将这些文件切分为506942个split,那就需要读取506942个split,这显然是没有意义的。

我们支持了读和写的SortOrder完全相同或完全相反的场景,而这个场景恰好是我们的日志场景下的需求。我们的日志场景下写模式为ASCNULLFIRST,读模式为DESCNULLLASTLIMIT200。

下图描述了PartialTopN下推后的性能对比。

我们希望一种写入的顺序(ASC)能够支持两种不同的读取顺序(ASC,DESC),目前我们确实也支持了这样两种读取顺序。但是,若写入的数据为1、2、3、4、5,而读取时希望读取到4和5,这时,我们会将所有的数据读入,但是抛弃了1、2、3,时间复杂度还是O(N)。第一个直观的想法是,我们是否可以从文件末尾开始读取。这从操作系统层面讲,由于读取局部性的原因,操作系统会将5后面的数据读取到内存,但实际上你又不需要这部分数据。所以,当读写顺序不一致时,目前LastNOperator算子的性能提升不明显,只有1到2倍的性能提升。另一个直观的想法是,改造OrcReader,从内存中逆向读取rowgroup,时间复杂度就可以从O(n)下降到O(ceiling(count/rowgroup)*rowgroup)。比如,当rowgroup的大小为10000行时,读取200条数据的话,只需要读取一个rowgroup即可达到目标,而不是读取所有的rowgroup。

我们在稳定性优化方面做的工作工作比较多,这里仅介绍一些比较重要的点。

我们通过以下限制增加了查询的稳定性:

Yuuni是一个基于HTTP协议的,用于统一访问Trino/ClickHouse集群的代理网关,对所有分析查询请求进行认证、监控、路由、缓存、限流等控制。

若我们有cluster-cs-01和cluster-cs-02两个集群,当我们需要对cluster-cs-01集群的Trino进行升级时,借用Yuuni的代理能力,将cluster-cs-01的查询无缝转移到cluster-cs-02上去,待cluster-cs-01升级完毕,再将查询切回cluster-cs-01上来,全部过程对用户透明无感知。

我们Trino集群目的有2套部署方式,一套部署在物理机上,另一套部署在容器化环境,长远看来会全部迁移到容器化环境中。

物理机部署的过程是:在代码仓库gitlab上构建,手动将tar包下载到本地,然后将该tar包分发到所有的机器上,最后利用公司的脚本执行平台将该tar包启动起来。这个过程的缺点是:回滚困难且全程几乎人肉操作,CPU利用率比较低。优点是:运维工具多,基于物理机可以随意下载想要的运维工具,排查问题比较方便,操作简单。

容器化一站式部署环境

容器化的部署流程如下图所示。代码在gitlab上,会自动关联平台进行构建,在选择任意的tag/分支/commit_id自动构建后,会自动关联计算平台,在计算平台上选择CPU和内存规格后,就直接发布了。而且,回滚也是非常方便的。

以上就是本次分享的内容,谢谢大家。

关于超大尺寸哔哩哔哩到此分享完毕,希望能帮助到您。



本文由欣欣吧手游攻略栏目发布,感谢您对欣欣吧的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人站长或者朋友圈,但转载请说明文章出处“超大尺寸哔哩哔哩?在哔哩哔哩湖仓一体化平台中的实践

标签:
超市特工(女星将客串)
« 上一篇 2024-01-08