在降本增效的大背景下,为满足业务对更高性能的需求,流式计算团队对 FlinkSQL 进行了深度优化。本文将聚焦这一实践,详解主要优化思路。
随着抖音集团内部对流式任务的需求不断增长,Flink SQL作为一种低成本接入手段,已经在内部多个方向上得到大规模应用。目前,流式 SQL 任务的规模已经超过3万,任务资源使用量和分配量也达到了百万core。
在降本增效的大背景下,为了解决资源紧缺的问题,并同时满足业务对更高性能的需求,流式计算团队对 FlinkSQL 进行了深度优化,本文将聚焦这一实践,详解主要优化思路。
01 Engine 优化
/ 查询优化
1. View Reuse
在流式 SQL 中,为增加 SQL 代码的可读性,通常会将通用的计算逻辑放在 view 中。在这里,view 只是一个逻辑概念:在底层实现时,并没有真实存储中的 view 与之对应。
如下图所示,场景一表示任务中存在多个 sink 的表,view 中是窗口聚合运算的逻辑。场景二表示任务需要对两个流进行 union,view 中是普通聚合运算的逻辑。
在这两种场景中,用户会定义一个通用的 view 来进行计算。因为下游不同分支对 view 的查询不同,view 中的计算逻辑会在不同算子中重复计算,由此带来了重复的资源开销。
那么问题就在于:为什么 view 没有被复用?
在 Calcite 的原有逻辑中,view 中包含的 Query 会被立即转化成一颗关系表达式树。如果有多条Query 访问了同一个 view,那么就会获得多颗属性完全相同,但分属于不同 Java 对象的 RelNode Tree。
因此,后续所有优化都是基于不同的子树对象分别进行的,无法再重新合并成同一棵树。
● Multi-sink
多 sink 的场景下,在生成 logical plan 阶段时,view 会被 Calcite 转换为多个 RelNode Tree。在后续 optimizer 的子图划分中,这些 RelNode Tree 不会被划分到相同子图中,从而导致 view 不能被复用。
由此可以看出,解决问题需要分别从 Calcite 和 Flink 入手。
在Calcite 的 SqlToRel Convert中,不应立即将 view 中的 Query 转化成对应的 RelNode Tree,而应直接返回包含了对应 Sql CatalogView Table 的 LogicalTableScan。
在Flink中,CatalogView 的实现需要将 LogicalTableScan 对象存储下来,让下游节点都引用同一个 CatalogView。在优化之前,将 LogicalTableScan 中的 view 展开成 RelNode Tree,以便下游节点能够引用相同的 RelNode Tree 对象。
● Union all
在 Union all 场景中,为了复用 view,可以在 view 后面增加一个虚拟的 sink 节点,将 Union all 场景转换为多 sink 场景。这可以使 view 在 logical plan 阶段时,不会提前展开成 RelNode Tree,而 union 也能够引用到相同的 View 对象。虚拟的 sink 节点则会在子图划分后被删除。
从上述两个场景中可知,在进行了 view 复用优化后,view 对应的计算逻辑只需计算一次,整体 CPU 收益为20%。
2. Remove Redundant Streaming Shuffle
Remove Redundant Streaming Shuffle 可以移除流式场景下不必要的数据分发开销。在批式场景中, shuffle 操作会有落盘的性能开销,这已经在社区中得到了优化。而在流式场景中,shuffle 操作则有序列化和网络传输的开销。
如下图例子所示,在计算不同品类产品价格 Top5 的平均值时,使用了排序和聚合计算。在排序和聚合前对 id 进行了 hash,这说明两个算子有相同的 hash key。数据被 rank 算子 hash 后,就不需要再进行第二次 hash 了,这说明第二个 shuffle 是多余的。
shuffle 是在生成 physical plan 的阶段中产生的。下图展示了 Sql 优化器将 SqlNode 从逻辑节点转换为物理节点的过程,在这个过程中,shuffle 也就是 exchange。
转换过程是通过规则进行的,在 relRule.Convert 过程中会遍历每一个逻辑节点,判断当前节点是否满足转换规则,如果存在不满足的情况,就会增加一个 AbstractConvert。
在生成 Exchange 的规则中,会判断当前节点的数据分布特征是否满足需求,如果不满足,就在节点上游增加 Exchange 节点来满足数据分布的特征。最后,PhysicalExchange 会被转换为 hashShuffle,用于数据的分发。
如何移除掉多余的 Streaming Shuffle?针对该问题,主要思路是参考 Batch 对 Shuffle 的优化。在规则转换的过程中,不仅要考虑节点本身,还要考虑输入节点的特征是否满足需求,将问题往上抛。
实现针对 Physical RelNode 的规则判断方法,主要分为以下两种情况:
● 对于本身没有数据分布特征的节(如 Calc 和 Correlate Node),判断它们能否满足一个特定数据分布的需求,只需检查自身输入中是否包含 hash key。
● 对于本身有数据分布特征的节点(如 Aggregate 和 Rank nodes),需要确认本身的数据分布特征是否满足给定的 distribution requirements。
如下图所示,首先要检查 aggregate 节点是否满足数据分布特征,这需要查看它的输入,即 rank 节点是否满足要求。如果 rank 节点不满足,则需要在其上游添加 exchange 节点。添加后,rank 算子满足了数据分布特征。由于 rank 和 aggregate 的 hashkey 相同,因此 aggreagte 也满足了。
该方法可为火山模型提供更优、成本更低的执行计划。火山模型最终将选择这个移除多余 Exchange 的执行计划。移除多余的 streaming shuffle 后,rank 算子和 agg 算子中的 hash 连接已经消失,并且 chain 在一起,整体 CPU 收益达到了 24%。这也为在 Streaming 场景下优化 MultipleInput 的算子提供了可能。
/ 查询执行优化
1.Streaming MultipleInput Operator
基于 Remove Streaming Shuffle,在对多余的 hash shuffle 进行优化的前提下,可以在 join+join、 join+agg、join+union 中,对shuffle 进行更深层的优化。
如下图所示,因为 agg1 hash key 和 Join left key 相同;agg2 hash key 和 Join right key 相同,所以可将 Join 前的 hash 变为 forward。
当前的 OperatorChain 策略不支持多input算子的 Chain,无法避免因多余 shuffle 而导致的序列化、反序列化和可能的网络开销。因此,流式计算团队使用 MultipleInput 机制,在 Streaming 场景下,将多个 Input 的算子上下游合并为 MultipleInutOperator,从而进行优化。
具体而言,优化经历了以下几个步骤:
(1) 首先,在 Planner 层构建出 MultipleInputExecNode
MultipleInputExecNode 是在 logical physical 计划后,当 plan 被转换为ExecNode DAG时,从 ExecNodeDAG 中推导而出。获得 ExecNodeDAG 后,先从根节点进行广度优先搜索,从而获取图的拓扑排序。构建 MultipleInputExecNode 是在 Covert ExecNode DAG 环节进行的,完成这一系列操作后,它将在 ExecNode Graph 中构建出来。
(2) 在生成 StreamMultipleInputExecNode 后,被 translate 成 StreamMultipleInput transformation
在 transformation 中,包含了创建 MultipleInput Operator 的一些信息,通过 TableOperatorWrapper 存储 sub-op 信息。
(3) 生成 Job Graph
这需要满足以下2个条件:
●StreamConfig 需要兼容 Multiple Input 从 two Input 的 TypeSerializer1,2变成 TypeSerializer[],这主要用于 state/key 数据传输。
●Stream Graph 可以添加 MultipleInputOperator 节点,通过方法 addMultipleInputOperator,将 Transformation 对应的 properties 添加到 vertex 中构成 Stream Graph 中的节点。
运行时实现了 StreamingMultipleInputOperator ,且需要考虑算子的创建,算子的数据处理,状态,Timer&&Watermark,Barrier,Checkpoint 等问题。
● Operator initialization
不只要创建 StreamingMultipleInputOperator,也要创建对应的 sub-op;
sub-op 本质上是 Abstract StreamOperator,sub-op id = op id + index;
在 createAllOperator 创建每个 sub-op 对象,并构建 DAG 的输入输出。
● ProcessElement
处理数据过程中要保证 key 的传递。
● State
MultipleInputStreamOperato 和 sub-op 分享state handler;
创建新的 API stateNameContext 来解决状态名字冲突。
● Timer && Watermark
MultipleInputStreamOperator和sub-op 分享 timeServiceManager;
创建新的 api TimerNameContext来解决状态名字冲突;
timeServiceManager 以 sub-op粒度管理 timer;
使用Combindedwatermark 来保证 Watermark 对齐。
● barrier
此处无需过多考虑,MultipleInput Operator 内部没有 buffer 中的数据,因此按照拓扑顺序进行 checkpoint 不会丢失数据。但需要注意的是,需要将 prepareSnapshotPreBarrier 从 MultipleInputStreamOperator 传播到所有子算子。
经过优化后,agg+join 操作会被合并到 MultipleInput 算子中,这将带来10%的 Cpu 收益,同时也会解决网络内存不足导致任务无法启动的问题。
2. Optimization of Long Sliding Windows
● 长滑动窗口及其底层实现逻辑
在 Flink SQL 中,长滑动窗口的具体写法是 Hop(table, slide, size)。其中,size 表示窗口的大小,slide 表示窗口移动的步长。在滑动窗口中,如果步长小于窗口大小,那么会有元素属于不同的窗口。
在滑动窗口计算中,如果窗口时间周期长,在大流量场景下计算7天、30天等时间段的uv并进行去重的操作时,会出现计算中数据延迟特别严重,甚至数据无法推动的问题,即便增加资源也无法解决这一问题。
经过对滑动窗口底层实现逻辑的分析,可知滑动窗口计算的主要性能瓶颈在于窗口计算最小的单位——窗格(pane)的合并操作。pane 是窗口大小和步长的最大公约数,大多数时候,pane 的大小都是 1。每次滑动窗口触发计算时,均需要把当前窗口下对应的所有窗格数据重新合并一遍。由于长窗口下其窗格数量很多,所以性能开销很大。
● 长滑动窗口优化思路
对此,主要的优化思路是以空间换时间:
(1)在窗口算子中定义全局状态,存储当前窗口的计算结果;
(2)在聚合函数中新增 retractMerge 方法,窗口向后滑动时,移除被划走窗口的数据;
(3)触发下一次计算时,合并新增窗口的数据。
如下图所示:在窗口向后滑动 3 个窗格时,移除 pane1-pane3 的结果,再合并进来 pane 11-pane13 的结果。总共需要计算6个窗格,优化了4个窗格的计算。
因此,当窗口大小和滑动步长的比值越大,优化效果就越明显。优化后,整体 CPU 收益达到了 60%。
/ 数据处理(Format侧)
1. Native Json Format
目前,抖音集团公司内部约有1.3万个任务使用 Json Format,占用资源近 70 万core。如果按照 5%的占比进行保守估计,线上约有3.5万core用于 Json 的反序列化,因此该部分有较大优化空间。
下图展示了数据从消息队列(MQ)中读取,并最终传递给下游运算符的主要流程。其中,Json 反序列化和将 GeneralRowData 序列化为字节,是两个重要的开销。
针对上述两项重要的资源消耗,主要从以下两个方面进行优化:
● 针对 Json 反序列化开销
使用支持向量化编程的 c++ json 解析库, 选择字节内部自研的 sonic-cpp,来提高性能。
● 针对序列化为 binaryRowData 的开销
使用 native 方法直接产出 BinaryRowData 所需要的二进制表示,再使用 BinaryRowData 指向这一部分数据,从而免去序列化对应的开销。
在测试集中,native Json 的 CPU 收益能够达到 57%。
02 优化实践
为了确保引擎优化能够给业务方带来实际的优化效果,流式计算团队在内部做了大量工作,以确保优化项能够稳定上线,以下将对此展开详细介绍。
1. 工具层
如上框架图所示,最下层是工具层,具备以下5项能力:
● 支持 SQL 任务元信息实时上报;
● 算子粒度离线数仓,提供算子粒度的任务监控;
● Commits 粒度 DAG 兼容性检查 :可以提前发现哪些优化项会影响任务状态恢复;
● 优化项分优先级灰度:可以限制风险暴露范围;
● 数据准确性链路构建:保证了上线优化项不会导致数据准确性发现问题。
基于上述能力,工具层实现了算子粒度的任务监控,同时保证了任务稳定性和数据准确性。
2. 优化层
在优化项这层,对存量优化进行推广上量或全量,同时也对很多新增优化项展开探索和推广。
3. 引擎&平台层
在引擎&平台层,与业务方协作,推动存量任务治理。通过在平台侧进行优化项配置,使新增作业能够直接应用某些优化项。同时,经过校验的优化项将在引擎侧中默认开启。
经过优化,最终获得了 10w core+ 的性能收益。
在未来,流式计算团队将持续优化 FlinkSQL,探索 Join 中状态的更优使用方式。同时,也会在流批融合 native Engine 等方向上持续探索发力。