01 湖仓架构
腾讯大数据的湖仓架构如下图所示:
这里分为三个部分,分别是数据湖计算、数据湖管理和数据湖存储。
数据湖计算部分,Spark 作为 ETL Batch 任务的主要批处理引擎,Flink 作为准实时计算的流处理引擎,StarRocks 和 Presto 作为即席查询的 OLAP 引擎。数据湖管理层以 Iceberg 为核心,同时开放了一些简单的 API,支持用户通过 SDK 的方式去调用。在 Iceberg 之上构建了一套 Auto Optimize Service 服务,帮助用户在使用 Iceberg 的过程中实现查询性能的提升和存储成本的降低。数据湖底层存储基于 HDFS 和 COS,COS 是腾讯云的云对象存储,可以满足云上用户的大规模结构化/非结构化存储需求,在上层计算框架和底层存储系统之间,也会引入 Alluxio 构建了一个统一的存储 Cache 层,进行数据缓存提速。本次分享的重点主要是围绕智能优化服务(Auto Optimize Service)展开。
02 智能优化服务
智能优化服务主要由六个部分组成,分别是:Compaction Service(合并小文件)、Expiration Service(淘汰过期快照)、Cleaning Service(生命周期管理和孤儿文件清理)、Clustering Service(数据重分布)、Index Service(二级索引推荐)和 Auto Engine Service(自动引擎加速)。以下就各模块近期做的重点工作展开介绍。
1. Compaction Service
(1)小文件合并优化
小文件合并有读和写两个阶段,由于 Iceberg 主要以 PARQUET/ORC 列存格式为主,读写列存面临着两次行列转换和编解码,开销非常大。针对这个痛点,我们对 Parquet 存储模型进行了分析,主要由 RowGroup、Column Chunk、Page 以及 Footer 组成,相对位置如下图所示,不同列的最小存储单元以 Page 级别组织,数据水平方向上以 RowGroup 大小划分数据块,以便上层引擎按照 RowGroup 级别分配 task 加载数据。
基于存储模型的特点,我们针对性地采用了 RowGroup Level 和 Page Level 两种拷贝优化,对于大文件合并大文件且仅涉及重新压缩、仅涉及列裁剪的场景,使用 RowGroup Copy;对于小文件合并大文件、不涉及列变化、不涉及 BloomFilter 的场景,使用 Page Copy。
下面是我们内部全部升级优化之后的落地效果,合并时间&资源减少 5 倍多。
(2)更多优化
我们还增强了 Delete Files 合并优化和增量 Rewrite 策略。
在大规模 Update 的场景下,会产生大量的 Delete Files,数据读取时会频繁地进行 Delete File Apply Data File 的操作,这个过程是串行的,I/O 开销巨大。当合并的速度低于 Delete File Apply 的速度,就会因为积攒了大量的 Delete Files 导致合并失败。针对这个痛点,我们使用 Left Anti Join 拆分出了关联 Delete File 的 DataFile 和未关联 Delete File 的 DataFile,然后将两者进行 Union All。此外还在 Delete File Apply Data File 的过程中使用了 Bloom Index 加速寻找,及时删除未关联 Data File 的 Delete File。
增量 Rewrite 优化会通过在 DataFile 中引入 Modify Time 来决策,进行分区级别的增量更新。
2. Index Service
(1)Iceberg Core Framework
Iceberg 较 Hive 增加了 min-max 索引,记录了 DataFile 所有 column 列的最大值和最小值,在执行引擎计算时可以协助做文件级别的过滤,但是文件级别的索引粒度较粗,在随机写数据的时候 min-max 存在交叉,导致索引失效。所以我们在这个基础之上进一步拓展了二级索引,来提高 Data Skipping 的能力,加速查询。索引的构建和加载过程在 Iceberg Core 层的框架支持实现如下:
(2)Iceberg scan metrics
对于专注于业务开发的用户来说,索引的选择往往是比较困难的,如何精准的判断是不是需要索引,需要什么索引,索引是否有效,索引是否会带来副作用等,往往需要经过一些额外的任务来进行分析,如果靠用户自己的决策选择,获得大规模的适配收益很难。基于这个想法,我们做了智能推荐索引的支持,而智能的推荐,首先是需要一套 metrics 框架的支持,能够记录表的 Scan,Filter 等各种事件,收集 Partition Status 信息,然后对这些事件进行分析,统计列的查询频次,过滤条件,根据规则区分高/低基数列等。最后根据分析结果,进行 Index 的推荐。
(3)索引智能推荐流程
整个端到端的 Index Service 流程如下图:1)首先是 SQL 提取,由于我们获取到的 SQL 是引擎优化后的,并不是原始 SQL,所以需要进行 SQL 重构。2)是索引粗筛,根据拿到的信息,比如列和分区的查询频度,初步判断怎么建立索引是有效的。3)开始尝试构建索引,支持构建分区级别增量索引。4)在用户无感知的情况下进行任务双跑。5)根据双跑结果进行索引优化的效果评估。6)将索引优化数据输出给用户,推荐用户使用。7)由于索引构建是复杂的,一个表会被多任务引用,一个任务也会去访问多张表,我们提供任务级别和表级别的索引构建,尽可能实现表级和任务级的同步优化。
3. Clustering Service
由于 Iceberg 的 min-max 索引在随机写的情况下是普遍失效的,导致 Data Skipping 能力较差,所以如果需要精确覆盖 min-max,可以将数据进行重排分布。当用户进行单列查询的时候,提前对数据列排序写入,如果是多列查询的情况,由于无法保证多个列都分布在一个文件中,我们使用 Z-order,对每个列进行数字化处理,采样计算 Range-ID,生成交错位Z-Value,根据 Z-Value 进行重分区,可以保证不同列之间的相对有序性。
实际业务中,Data Clustering 和 Data Skipping 都实现了四倍以上的效果提升。
4. AutoEngine Service
相对于 OLAP 引擎来讲,Iceberg 表,Hudi 表都是外表,这些外表基本都是 TB 级别,使用 StarRocks,Doris 查询外表并不能发挥 OLAP 的查询优势。AutoEngine Service 通过收集 OLAP 引擎的 Event Message,对相应的分区进行加热,也就是将相关分区数据路由到 StarRocks 集群,上层引擎可以在 StarRocks 集群中发现该分区的元数据,由此实现基于存储计算引擎的选择优化。
03 场景化能力
1. 多流拼接
关于多流拼接,这里举个例子简单说明, 如图所示,有两个 MQ 同时往下游写数据,MQ1 更新列 data1,MQ2 更新列 data2,最终根据 id 聚合,取时间戳 orderColumn 排序最靠前的一条,作为 join 之后的 source。要实现这个合并更新能力,往往需要外接各种临时存储 Redis/Hbase/MQ 等组件。
那在 Iceberg 层面是怎么优化的呢?由于 Iceberg 本身支持事务和列级的更新删除操作,类似于代码仓库的 Branch 概念,因此可以通过打 tag 的方式去标记状态。具体实现是,初始化阶段,数据写入主流程,同时多流往其他 Merged Branch 去写入,写完之后的话会有一个异步的 Compaction 任务,定期和主流程合并,当用户在读的时候,直接读取 Merged Branch。
2. 主键表
通过多流 Join 的实现方法依赖 Compaction Service 的调度性能,当数据规模不断增加,多流 join 聚合计算更新的拼接方式可能存在性能瓶颈,所以我们也引入主键表作为行级更新的另一种实现方式。比如这里我们根据 id 分成四个桶,存在多个任务往一个桶去写数据,一个桶内的数据是有序的,那么下游在读取桶数据的时候会更轻松。但是当 id 的基数很大的时候,比如当 id 为 4/8/16 的时候,都会往一个桶内写数,会产生 DataFile 的重叠,在下游从桶内读数的时候,就需要合并一个桶内的多个 DataFile 到一个 Reader 处理。如果分桶数量设置的不合适,单点压力就会过大,此时可以使用 Rescale 实现桶的弹性扩缩容。另外在桶的基础上扩展列族 Column Family 的概念,相当于每个列都作为独立的文件写入,多个 Column Family 行拼接 Full Outer Join 即可。
3. In Place 迁移
由于对数据湖的高阶特性能力的需要,很多业务做了架构的升级,同时也面临着存量 Thive(腾讯自研 Hive)和 Hive 的数据迁移到 Iceberg。这里需要重点支持的工作包括:存储数据的迁移,计算任务的迁移。
首先存储数据的迁移,我们提供了 data in-place 的方案,不搬移原来的 data files,仅仅重新生成 Iceberg 新表所需的 metadata 即可,迁移的过程支持了 STRICT/APPEND/OVERWRITE 等三种模式。
其次是计算任务的迁移支持, 我们改进支持了新的 Name Mapping 机制,增强支持了 Identity partition pruning 能力,使得对于场景的 built-in functions 裁剪能力取得数量级性能提升,优化实现如下:
4. PyIceberg
Iceberg Table Spec 是开发性的实现,可以支持多种语言 API 接入,AI生态圈数据科学等主要以 Python 环境为主,要求高性能 Native 解码,对 JVM 环境无强依赖,PySpark 虽然具备接入 Iceberg 的能力,但是太重了。我们可以直接利用 PyIceberg 能力,无JVM 依赖,加载解码一次即可,提供广泛的机器学习类库的优势,拓展 Python的技术栈到 Iceberg 元数据层面,构造 Pandas,Tensorflow,Pytorch 等不同的 DataFrame,方便进行数据分析和 AI 模型训练的编程探索,我们内部也深度支持了 PyIceberg SQL 的列裁剪和谓词下推能力,结合 DuckDB 做一些小数据集的算法快速调试。
04 总结和展望
未来还将从以下方面着手,进行实时湖仓的优化:
1. Auto Optimize Service
冷热分离降本提效
物化视图提速
AE 服务智能化感知
Compaction 能力打磨
更多 Transform UDF Partition Pruning 优化
2. 主键表优化
拓展 Deletion Vector,解决谓词下推必须联合去重的性能问题
3. AI 探索
落地适合模型训练的湖仓格式。
探索实现分布式 dataFrame,整合 metadata 和引擎。