服务器 频道

Apache Celeborn 在B站的生产实践

  背景介绍

  Shuffle 演进

  随着B站业务的飞速发展,数据规模呈指数级增长,计算集群也逐步从单机房扩展到多机房部署模式。多个业务线依托大数据平台驱动核心业务,大数据系统的高效性与稳定性成为公司业务发展的重要基石。如图1,目前在大数据基础架构下,我们主要采用 Spark、Flink、Presto 以及 Hive 作为计算引擎支撑各类复杂业务场景需求,离线计算集群基本每天运行30+万左右的 Spark 作业,包括任务调度平台的 ETL 任务、Kyuubi 提交的 Adhoc 作业等,其作业的 Shuffle 数据规模能够达到30PB以上,同时单个作业的 Shuffle 量最大规模有几百TB。同时 Shuffle 算子作为大数据计算引擎中间数据处理最重要的算子,Shuffle 的稳定性关系着线上大量离线作业的可靠性和性能。因此,对于海量的 Shuffle 中间数据和复杂多变的计算环境来说,保证 Shuffle 数据处理的稳定性对线上作业的稳定性和运行效率尤为重要。  

图1:B站大数据基础架构图

  Local Shuffle 的早期引入

  前期我们采用 Spark 社区官方提供 External Shuffle Service(ESS)方案,生产实践过程中,我们发现 Spark 任务运行过程中 ESS 有稳定性差的情况,主要表现在:

  NodeManager, ESS, Datanode 同时部署,进程之间共享网络带宽,容易导致各组件之间互相影响,节点稳定性差。

  高网络连接数:Shuffle Read 有大量的网络连接,逻辑连接数是M*N。对于 M*N 次的连接数而言,容易导致线程池消耗过多 CPU,影响作业的性能和稳定性。

  大量随机读:Shuffle Read 存在大量的随机读盘,假设一个 Mapper 的 Shuffle 数据是 128M,Reducer 的并发是 2000,那么每个文件将会被读 2000 次,每次只随机读 64k,非常容易达到磁盘 IOPS 瓶颈,导致大量小粒度随机读,同时 HDD 盘随机读性能非常差;而 SSD 盘快速消耗磁盘寿命。

  External Shuffle Service 集群无法弹性扩缩容,难以灵活应对热点数据的大任务。

  Executor 只会将数据写入其本地存储,并且不会跨多个节点复制数据,意味着如果与 Executor 共存的 Shuffle 服务崩溃,则此节点上所有 Executor 写入的 Shuffle 数据都会丢失,每个在该节点上写入 Shuffle 文件的 Stage 都需要重新计算,消耗额外计算资源。

  Push Based Shuffle 的演进

  中期为了降低 Fetch Failed Exception 造成的作业失败率,选择基于 Spark 社区提供的 Push-based Shuffle 方案,此方案的优缺点主要是:

  优点:

  1. 对 AQE 支持友好:Push-based Shuffle 方案天然支持 Spark 的动态查询执行(AQE),在分区重排和数据分布优化中表现优异。

  2. 减少内存压力:通过将中间数据写入本地磁盘再异步推送到远程节点,减少内存占用。

  缺点:

  1. 写放大问题:由于需要先将数据写入本地磁盘,整体的 I/O 开销有所增加,尤其对小任务不够友好。

  2. 冷启动问题:作业启动时,Driver 管理的 Executor 数不足,导致可供选择的远程节点有限。SQL 作业中的数据量通常随过滤条件逐渐减少,但在初始 Shuffle 阶段可能因节点不足 Fallback 到原生 Shuffle 实现。

  为了解决写放大以及冷启动问题,引入 Shuffle Service Master 节点,主要优化点包括:

  Shuffle Master 的引入:

  1. 注册和心跳机制:所有 External Shuffle 节点启动时都会向 Shuffle Master 节点注册,并且周期性上报心跳和节点的繁忙程度。

  2. 集中化的节点管理:DAGScheduler 请求远程节点时,通过 Shuffle Master 动态分配节点,从而缓解冷启动时节点不足的问题,同时能够选择健康度更高的节点。

  性能优化:

  1. 基于任务画像的动态启用:

  小任务由于等待时间的回退问题对性能影响较大,因此并不强制启用 RSS。

  基于 HBO(History-based Optimization)系统对任务进行画像分析,仅对大任务启用 RSS。

  2. 性能测试结果:目前大任务启用 RSS,任务平均执行时间缩短 25%,稳定性显著提升。

  改进后 Push-based Shuffle Service 的主要流程如图2所示:  

图2:优化后的 Push-based Shuffle Service 流程图

  采用 Shuffle Service Master 优化的 Push-based Shuffle 方案能够带来以下收益:

  1. 冷启动问题的解决:通过集中化的节点管理,作业启动时能快速分配健康节点,避免因节点不足回退到原始 Shuffle 模式。

  2. 性能提升:大任务使用 RSS 后平均执行时间缩短 25%,有效应对高并发场景下的大数据处理需求。

  3. 稳定性提高:动态节点分配和健康度监测显著提升服务的可用性。

  B站通过上述优化的 Push-based Shuffle 方案基本解决 Spark 作业的 Shuffle 稳定性问题,但随着架构演进和对 Kubernetes 混部环境的适配需求,此方案的局限性逐渐显现,尤其是对本地盘的依赖成为无法支持存算分离的主要瓶颈,由此为了适配满足 Spark On Kubernetes 方案需求,尝试在生产环境使用主流开源 Remote Shuffle Service 实现方案。

  最终我们选择了 Celeborn 主要关注以下几个方面:

  1. Celeborn 开源社区活跃度相比其他开源 Remote Shuffle Service 实现方案更高。

  2. Celeborn 源于阿里云自研的 EMR Remote Shuffle Service,具备良好的架构设计和稳定的生产实践输出。

  3. 离线计算团队内部具备 Celeborn 的技术储备,技术演进代价相对较低。

  4. 稳定性和性能方面具备流量反压控制、磁盘负载均衡、优雅滚动升级、支持多层存储、提供双副本机制等关键功能特性,以及异步化操作包括异步 Push、异步 Flush、异步 Commit 以及异步 Fetch 满足不论是在读写还是 Control Message 过程中不阻塞计算引擎处理要求。

  5. 集成 Native 引擎比如 Gluten、Blaze 等,未来对B站向量化引擎演进大有裨益。

  6. 使用云原生的 Push-Style Shuffle 代替 Pull-Style

  Celeborn 概览

  Apache Celeborn 是由阿里云捐赠给 Apache 软件基金会的开源大数据计算引擎通用 Remote Shuffle Service,旨在解决大数据引擎处理中间数据遇到的性能、稳定性及弹性问题,专注于处理中间数据提高中间数据的流转效率,从而提升 Shuffle 的性能、稳定性和弹性。Celeborn 的定位是大数据引擎统一中间数据服务,通过接管中间数据解决传统 Shuffle 问题。目前 Celeborn 已经在包括阿里云、小红书、Shopee、网易、LinkedIn 以及 Pinterest 等在内的多家国内外公司生产落地,同时 Celeborn 开源社区活跃度也非常高,吸引来自海内外开发者贡献功能特性,性能优化以及 Bugfix。

  如图3所示,Celeborn 的整体架构采用典型的 Client-Server 架构,主要由 Master、Worker 以及 Plugin 三个重要部分组成,其中:

  服务端包括:

  1. Master 的主要职责是管理 Celeborn 集群状态,负责分配 Slot 负载,同时基于 Raft 协议实现高可用。

  2. Worker 的主要职责是接收、存储和服务 Shuffle 数据,实现多层存储适配各种硬件环境。

  客户端包括:

  1. Lifecycle Manager 负责管理当前作业的 Shuffle Metadata,把 Shuffle 元数据从 Master 转移到 Celeborn Application,使得 Application 管理自己的 Shuffle 数据,从而大幅降低 Master 的工作负载。

  2. Shuffle Client 主要使用于每一个 Executor 或 TaskManager 上面,负责执行具体推送和读取 Shuffle 数据。  

图3:Celeborn 核心架构图

  可以看到,Celeborn 的 Master、Worker、Lifecycle Manager 以及 Shuffle Client 是与引擎无关的,没有引入任何大数据引擎的依赖,计算引擎通过 Shuffle Client 提供的 API 集成 Celeborn,官方已支持集成 Spark、Flink 和 MapReduce 三种计算引擎,正在支持集成 Tez 引擎。同时 Shuffle 数据不再存储在本地磁盘而是存储到 Worker 上面,解除计算节点对本地磁盘的依赖。

  生产实践

  部署运维

  如图4,Celeborn 集群独立部署在 Kubernetes 环境,Master 和 Worker 集群是单独部署的,其中:

  3个 Master 集群:每个 Master 集群部署3个 Master 节点,Master 节点开启 HA 独立部署服务。

  4个 Worker 集群:

  Worker 集群是异构磁盘的,采用 6TB NVME SSD 盘,总共270个 Worker 节点。

  Worker 节点单独部署服务并且独占机器,其中每台物理机网络流量50Gb,读+写6GB/s,机器配比为270:13.4Tb/s(总容量):4.53Tb/s(高峰入容量):1.31Tb/s(平均入容量)。

  Worker 集群按照部署机房维度分为常熟集群和嘉定集群,常熟集群按照作业优先级划分为低优集群和高优集群

  低优集群:运行 Spark(低优)、Flink 和 MapReduce 作业 Shuffle。

  高优集群:面向 Spark(高优)作业 Shuffle。  

图4:Celeborn 集群部署架构图

  目前 Celeborn 滚动升级使用 Master 集群尽量先升级一台非 Leader 的 Master 节点,后升级作为 Leader 的 Master 节点,Worker 集群按照升级单 Worker 节点-多个 Worker 节点-全部升级策略。其中 Celeborn 灰度发布 Worker 服务按照单个 Worker 节点-多个 Worker 节点(1%-10%-30%-50%-100%)-全部节点策略逐步发布,发布期间观察物理机指标包括 CPU 利用率、真实使用内存等,以及 Shuffle 过程各项时间指标包括 OpenStreamTime、FetchChunkTime、FlushDataTime、PushDataTime 等。同时选取多个典型的 Spark/Flink/MR 作业运行在灰度节点上面,对比运行时长,验证数据质量。在1%-10%灰度发布期间,如图5所示,灰度发布的 Worker 节点标注 Grayscale Tag,同时挑选一批灰度更新的 Spark/Flink/MR 作业采用 HBO 配置打 Tag 方式,此时灰度更新的作业能够运行在任意 Worker 节点,其余节点只能运行在除了灰度发布以外的 Worker 节点。  

图5:Celeborn 灰度发布 Worker 图

  监控指标

  Celeborn 基于 Dropwizard Metrics 库提供一个可配置的指标系统,此系统提供 Master 和 Worker 服务进程的监控指标,允许用户将 Celeborn 监控指标报告到多种接收端包括 HTTP、JMX、CSV 文件和 Prometheus Servlet。基于 Celeborn 支持的监控指标和 Shuffle 运行过程中的关键操作,如图所示,从集群指标、应用指标以及业务侧指标角度出发制定如图6、7所示的 Master 和 Worker 集群监控面板:  

图6:Master 集群监控面板图  

图7:Worker 集群监控面板图

  元仓建设

  Celeborn 缺乏观测 Shuffle 运行过程集群资源使用情况,同时缺乏作业细粒度运行统计信息,导致难以衡量 Shuffle 运行时集群资源分配以及治理大规模作业 Shuffle 体量。如何基于运行时集群资源使用合理分配作业 Shuffle 资源,根据 Shuffle 历史资源使用数据提供精准作业放量资源预估以及支持作业 Shuffle 性能诊断推动用户治理 Shuffle 异常作业是建设 Celeborn 元仓的主要目标。围绕观测 Celeborn 集群 Shuffle 资源使用目标,Celeborn 元仓按照集群和作业维度划分,其中集群维度包含 Worker 集群资源消耗概况,而作业维度则提供 Worker 集群作业细粒度资源使用明细。为了提高 Celeborn 集群分配 Shuffle 资源掌控,搭建 Celeborn 元仓服务通过收集 Celeborn 集群监控指标以及作业异常堆栈,对资源消耗指标按照各维度进⾏汇总同时持久化,采用数据服务层查询 Celeborn 资源消耗指标元仓数据提供元仓数据应用,其 Celeborn 元仓构建链路如图8所示包括:

  1. Celeborn 支持 /metrics/prometheus 接口提供 Master 和 Worker 监控指标,其中 Worker 新增作业细粒度资源消耗指标,用来提供作业维度资源消耗汇总数据,指标详情参考[CELEBORN-1501] Introduce application dimension resource consumption metrics of Worker。

  2. Worker 采用 Collector 模块调用 REST API 收集 Celeborn 集群相关监控指标实现 Agent 的 Collect 接口,由 Agent 负责推送 Worker 集群和作业维度资源消耗指标到 Kafka。

  3. 使用 Flink 把资源消耗指标数据导入 StarRocks,StarRocks 用来存储 Worker 集群和作业细粒度资源消耗指标数据。

  4. 元仓数据服务 BMR Data Service 查询 StarRocks 的 Celeborn 元仓数据,如图9.1、9.2、10所示  

图8:Celeborn 元仓构建链路图  

图9.1:Celeborn 失败作业异常分布图  

图9.2:Celeborn 元仓集群资源消耗概况图  

图10:Celeborn 元仓作业资源使用概况图

  智能路由

  HBO (History-Based Optimization) 是通过分析历史任务的运行日志、资源使用情况和性能指标,预测和调整未来任务的执行参数,实现自动化优化作业,提升集群资源利用率和任务执行效率。目前 Celeborn 基于 HBO 自定义高低优集群路由规则满足智能路由要求,即如图11所示,采用 JobParamRule 根据历史任务的 Shuffle 流量、当前作业的优先级、Celeborn 集群当前的 Shuffle 流量以及作业 Shuffle 流量阈值等路由到 Celeorn 高低优集群,保证基线任务能够使用高优集群运行 Shuffle 满足其资源使用,避免低优作业影响高优作业的 Shuffle 资源。  

图11:HBO 智能路由 Celeborn 集群图

  诊断治理

  Shuffle 卡顿是在大数据计算引擎在 Shuffle 阶段传输或存储效率低下,导致作业执行时间显著延长或阻塞无法按照预期时间完成。通常造成 Shuffle 卡顿现象的主要原因包括数据倾斜、分区数量不合理以及出现资源瓶颈例如内存不足、磁盘 IO 以及网络带宽瓶颈等。所以为了优化 Shuffle 卡顿现象,提升 Shuffle 过程性能,如图12所示通过 Celeborn Shuffle 治理大屏提供 Shuffle 卡顿情况分布、卡顿率变化趋势以及 Shuffle 卡顿作业详情,用来诊断治理 Celeborn Shuffle 卡顿问题,结合 Celeborn 监控指标和服务日志降低 Shuffle 卡顿率,从而大幅优化 Shuffle 性能缩短作业运行时间。当前衡量 Shuffle 卡顿问题依据 Shuffle Read 阶段 Shuffle Read Blocked Time 和 Shuffle Write 阶段 Shuffle Write Time 超过3分钟并且 Blocked Rate 超过20%判断 Shuffle 读写速率是否变慢。  

图12:Celeborn 诊断治理大屏图

  混沌测试

  仅依靠单元测试、集成测试、e2e 测试等无法满足验证服务可靠性,即难以覆盖生产复杂环境,例如坏盘、CPU过载、网络过载、机器挂掉等异常情况。为了测试 Celeborn 服务面对故障和不确定性时的稳定性、可恢复性和鲁棒性,通过引入 Celeborn 混沌测试框架,模拟生产可能出现的异常故障,校验 Celeborn 服务的稳定性和可靠性,同时需要保证满足 Celeborn 最小运行环境,即至少3个 Master 节点和2个 Worker 节点可用,并且每个 Worker 节点至少有一块盘。如图13,Celeborn 混沌测试框架主要由调度器(Scheduler)、执行器(Runner) 以及 命令行(CLI) 组件构成:

  1. Scheduler

  负责调度 Runner 执⾏测试计划。

  产⽣模拟生产故障的异常事件,当前已支持的异常事件类型如下:

  杀掉 Master 进程

  杀掉 Worker 进程

  Worker ⽬录不可写

  Worker 磁盘 IO Hang

  CPU ⾼负载

  Master 节点元数据损坏(随机测试模式不能执⾏此 Action ,在测试计划解析过程中直接报错)

  监控 Celeborn 集群资源状态,判断资源是否已经达到最⼩可⽤集合。

  2. Runner

  监控 Celeborn 节点资源状态,当前已支持的状态包括 Master 进程是否存在、Worker 进程是否存在、磁盘是否可读写以及 IO Hang。

  执行测试计划对应的异常事件,模拟可能发生的生产故障。

  3. CLI

  提交给 Scheduler 用户自定义测试计划。

  查询当前测试环境各个集群资源状态。

  暂停或者继续混沌测试流程。  

图13:Celeborn 混沌测试流程图

  Scheduler、Runner 和 CLI 组件之间的主要交互过程:

  Runner 在启动时向 Scheduler 注册自身。

  CLI 通过 SubmitCommand 向 Scheduler 提交 VerificationPlan,其中 VerificationPlan 包括:Actions、Trigger 和 Checker。

  Scheduler 按照循环周期重复执行接收到的 VerificationPlan:

  Scheduler 根据指定的策略触发 Action。

  Action 生成相应的 Operation。

  Action 根据 Deduced context 推导出 Operation。

  Checker 验证资源是否已达到推导上下文中定义的最小可用资源集。

  提交 Operation 到 Runner 线程执行。

  Runner 执行来自 Scheduler 的异常 VerificationPlan,并且将执行结果反馈给 Scheduler。

  SchedulerContext 根据 Operation 执行结果进行更新。

  Runner 监控节点资源状态并且将其汇报给 Scheduler,包括 Master 和 Worker 进程状态、磁盘读写情况以及是否 IO Hang 等状态。

  引擎集成

  Spark 集成

  Celeborn Spark Client 发布版本完全依赖 Spark 镜像,无法独立部署更新最新版本,影响 Celeborn Spark Client Bug 修复和性能优化发布效率。为了解耦 Spark 镜像发版,如图14所示采用 --jars 或者 --conf spark.celeborn.client.jar 方式指定 Celeborn Spark Client Jar 实现 Celeborn Spark Client 单独发版无需依赖 Spark 镜像。此方式依赖 [SPARK-45762] Support shuffle managers defined in user jars by changing startup order,同时此 Patch 解决设置 spark.executor.userClassPathFirst=true 时 CelebornShuffleHandle 的 Classloader 不一致问题,即 SparkShuffleManager 采用 ChildFirstURLClassLoader 加载创建 CelebornShuffleHandle 实例,而在创建 ShuffleWriter 和 ShuffleReader 检查当前 ShuffleHandle 实例是否是 CelebornShuffleHandle 类型时采用默认的类加载器 AppClassLoader,造成当 spark.executor.userClassPathFirst=true 时仍然使用 Spark 原生的 Shuffle 实现。  

图14:Celeborn Spark Client Jar 独立部署图

  Shuffle 数据损坏是 Spark 生产作业运行过程中经常遇到的问题,在绝大多数情况下数据损坏很难复现甚至难以排查定位问题根因。同时 Celeborn 支持 Spark 缺乏数据质量验证以及数据的完整性校验,造成数据的一致性问题需要用户端进行校验反馈,无法在计算引擎数据传输阶段发现 Shuffle 数据质量问题。为了验证 Shuffle 数据的完整性和一致性,诊断系统 Compass 根据解析 Event Log 提取 SparkListenerSQLAdaptiveExecutionUpdate 得到的 SparkPlanInfo 元数据,检查 Spark 作业的 Exchange 算子读写记录数是否相等校验数据质量即 Shuffle Records Written 指标值是否等于 Records Read 实现全局监测 Shuffle 数据质量状况,此校验排除 Limit、GroupBy,ReduceByKey,Join,Broadcast Join 等算子带来的不一致情况。通过如图15所示 Exchange 算子 Checksum 校验,在v0.3.1版本里发现如图22当 Mapper 已经结束从 Speculative Task 接收 PushData 或者 PushMergedData 时 PushDataHandler 触发 StageEnd 并且 ShuffleClientImpl 没有以 MapEnd 方式处理 StageEnd 导致此 Stage 的其他 Task 无法推送 Shuffle 数据造成数据丢失问题,详情参考 [CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MAP_ENDED when mapper has already been ended from speculative task。  

图15:Exchange 算子 Checksum 检查图

  Flink 集成

  目前 Flink 部署采用流批混部模式,其中 Pipedline Shuffle 使用存算一体架构,对于 Blocking Shuffle 而言,大规模 Flink 批作业依赖存算分离的 Remote Shuffle Service,避免和 Pipedline Shuffle 产生激烈的磁盘资源争抢情况。得益于结合 Flink 内存管理机制、容忍 JobManager/TaskManager 重启恢复等支持 Flink 关键设计和重要特性,Celeborn 支持 Flink 有1000多个 Flink 批作业的生产验证,Celeborn 承接的最大 Flink Batch 作业单 Shuffle 超过30T,作业运行平稳、稳定性和性能优异。

  由于 Celeborn Worker 集群独立部署并且 Worker 服务独占机器,Worker 集群磁盘和内存利用率尚可,但是 CPU 利用率平均只有30%左右,整体 Worker 集群资源利用率一般。为了提高集群 CPU 利用率,实现机器资源的降本增效,采用 Worker 低优集群和 Flink Batch 混合部署方式,目前如图16所示 CPU 利用率最高可达到80%以上。在 Worker 集群资源利用率满足内部要求的标准的同时,对于 Worker 集群节点而言 Worker 服务是此机器上面的主战场,需要保障 Worker 服务所使用的资源以及服务本身的稳定性。如何基于集群合理分配服务资源以及保证服务的稳定性是实现 Celeborn 资源保障的主要目标,通过结合 Tracker 服务(提供资源调度层和计算引擎层问题节点,包含反馈模块记录资源调度和计算引擎是否采纳决策的问题节点,为计算引擎运行时问题制定灵活管控策略 )决策资源调度服务是否优雅驱逐作业实现保障 Worker 服务的资源占用,避免服务资源紧张时仍然调度 Flink 批作业运行,从而影响 Worker 服务稳定性,其端到端的资源保障链路如图17所示:

  1. Tracker 服务启动异步 Goroutine 请求元仓数据服务获取 Celeborn 资源紧张的 Worker 节点,元仓数据服务按照 CPU/Memory 高低水印线查询 Celeborn 元仓和主机元仓不可用的 Worker 节点信息。

  2. 根据元仓提供的不可用 Worker 节点资源使用情况,Tracker 服务决策一级调度的名单进行资源调度以及问题节点实施节点驱逐。

  3. Tracker 服务把决策得到的 Worker 问题节点通知到内部资源调度服务 Amiya Resource Manager,给需要驱逐的节点添加 Taint 阻止 ResourceManager 对其继续分配,进行优雅驱逐 Flink 作业,同时停止新作业往这些 Worker 节点调度运行。  

图16:混部集群 CPU 利用率图  

图17:Celeborn 资源保障链路图

  MapReduce 集成

  目前 Celeborn 支持 MapReduce 每日执行10000多次实例,其中离线集群采用混部模式基于任务和集群的资源画像通过 Yarn RMProxy & Federation 路由低优先级作业到混部集群,Celeborn 支持 MapReduce 接入 RMProxy 实现用户无感知配置 MapReduce Shuffle 运行在 Celeborn 上面,Celeborn 接入 RMProxy 主要流程如图18所示包括:

  1. FederationClientInterceptor 使用 RouterPolicyFacade 根据 ApplicationSubmissionContext 的 Application 类型是否为 MAPREDUCE 决定是否为 MapReduce 作业,只有 MapReduce Shuffle 默认采用 Celeborn。因为 TaskAttemptImpl 和 MRAppMaster 在同一个进程中,所以在 AMContainerSpec 里设置 Celeborn MapReduce 相关的环境变量例如 Celeborn Master Endpoints、MapOutputCollector 和 ShuffleConsumerPlugin 实现类等。

  2. TaskAttemptImpl 创建通用 ContainerLaunchContext 时通过环境变量获取 Celeborn Master Endpoints、MapOutputCollector 和 ShuffleConsumerPlugin 实现类,也是使用环境变量方式传递给 YarnChild。

  3. YarnChild 启动过程中开启 MapReduce Task 期间,从环境变量中获取 MapOutputCollector 和 ShuffleConsumerPlugin 实现类,用来设置 JobConf 的 mapreduce.job.map.output.collector.class 和 mapreduce.job.reduce.shuffle.consumer.plugin.class 配置。

  4. MRAppMasterWithCeleborn 启动时,读取 Celeborn Master Endpoints 环境变量,用来配置 mapreduce.celeborn.master.endpoints 创建 LifecycleManager 调用 Shuffle Client API 读写 Shuffle 数据,详情参考 [CELEBORN-1460] MRAppMasterWithCeleborn supports setting mapreduce.celeborn.master.endpoints via environment variable CELEBORN_MASTER_ENDPOINTS。  

图18:RMProxy 接入 MRAppMasterWithCeleborn 流程图

  规划展望

  目前 Celeborn 在B站大规模生产落地取得显著收益效果,整体而言大幅提升 Spark、Flink 以及 MapReduce Shuffle 的执行性能和作业的稳定性,从而实现云原生环境的存算分离架构,赋能离线计算降本增效利器。后续对于 Celeborn 功能特性、性能优化以及问题修复持续和社区保持共创,Celeborn 规划事项主要包括:

  服务优化

  故障自愈

  B站一站式大数据集群管理平台 BMR 管理大数据服务组件和集群机器,通过 BMR 智能运维平台提供的故障自愈系统支持 Celeborn 服务故障自愈,实现对异常和故障实现自动化智能化的自愈处理。对于 Celeborn 生产故障而言,经常遇到磁盘相关的异常情况,故障自愈系统针对磁盘的故障自愈。对三类磁盘异常进行故障自愈处理:

  1. 硬件故障:对于有明显硬件故障,存在明显物理损坏的磁盘,自愈系统自动优雅下线 Worker 服务,将硬件故障磁盘从服务中剔除,同时提交报修更换磁盘请求,等磁盘更换完毕后自愈系统重新上线服务继续使用。

  2. 机器性能不足且影响业务:虽然没有明显的物理故障,但是性能显著下降并对 Celeborn 服务造成影响的磁盘,结合业务需求利用机器学习对磁盘指标数据进行深度分析和预测,支持提前识别潜在的问题磁盘实施自愈处理措施。

  3. 寿命不足:针对磁盘寿命不足场景,Shuffle 频繁磁盘读写操作加速磁盘磨损和寿命消耗,需要定期及时更换磁盘。为了避免磁盘寿命耗尽影响到 Worker 服务,结合磁盘寿命统计数据和实时性能表现综合判断是否需要换盘。

  同时故障自愈平台需逐步覆盖 Celeborn 服务可能出现的 IO Hang住、服务进程异常、端口异常、访问异常、服务日志异常等故障场景,尽可能实现 Celeborn 服务更多故障的故障自愈处理。

  弹性扩缩容

  目前 Worker 高优集群Shuffle 流量普遍集中在个别时段,其余时间端基本处于服务空跑状态造成资源浪费。为了提高集群资源利用率并且节省机器资源成本,有必要根据集群运行时资源负载对集群进行弹性扩缩容,实现如图19的 Worker 集群潮汐部署模式,支持自动扩缩容可参考[CIP-13] Support automatic scaling。Worker 高低优集群通过 Worker 自动扩缩容方式采用0点到9点低优集群缩容+高优集群扩容保证更多资源提供给优先级高的基线任务,9点到23点反向扩缩容把资源更多提供给低优任务方式进行潮汐部署,从而提高 Worker 集群资源利用率避免高优集群出现空跑浪费集群资源,能够按照作业的优先级自适应提供服务资源。  

图19:Worker 集群潮汐扩缩容图

  除了上述故障自愈以及扩缩容功能外,Celeborn 其他功能特性考虑支持:

  支持带优先级的 IO 调度能力,适配不同优先级作业灵活调度:目前采用 Worker 高低优集群运行相应优先级作业保证 Shuffle 资源使用,此方式无法根据磁盘负载调整 Shuffle 的 IO 调度导致低优作业的 Shuffle 抢占高优资源,难以适配作业的优先级达到优先保障高优作业 IO 读写操作效果。为了适应优先级作业需求,避免低优作业 Shuffle 阶段抢占高优作业的资源,引入带优先级的调度策略优化调度策略,提升高优作业的 IO 读写保障,其详情见[CELEBORN-952] Support IO scheduler of Worker with priority。

  支持 Remote Spilled Data,完全托管大数据计算引擎中间数据:大数据引擎的中间数据主要有两个来源: Shuffle 和 Spill,计算引擎在内存不足时将数据溢写到本地磁盘,例如 Spark 的排序、RDD 缓存以及 Hash Relation 等都可能溢写到磁盘,同时比如运行在 Kubernetes 上的 Spark 通常不希望 Pod 预留大容量的本地磁盘,因此需要 Celeborn 支持 Spilled Data,使得在存算分离架构里计算节点真正解除对大容量本地盘的依赖,计算结束能够释放节点,具体详情见[CELEBORN-1435] Support spilled data。

  引擎集成

  Celeborn 引擎集成主要围绕 Spark 和 Flink 引擎集成的稳定性和性能建设,其中包括:

  1.Spark 集成 Celeborn

  Spark 作业逐步放量,Celeborn Shuffle 流量占比计划提高到90%以上,Spark 作业基于 Celeborn 元仓数据衡量集群是否能够支撑相应流量进行逐步放量替换 Push Based ESS,而 Push Based ESS 作为兜底方案避免 Celeborn 集群出现严重故障期间没有可用的 Remote Shuffle Service。

  落地 Skew Join 无需文件排序优化:当读取 Reduce 模式倾斜分区的 Shuffle 文件时会对 Shuffle 文件进行排序,其文件排序造成生产环境有比如 Shuffle 排序超时以及性能问题。Skew Join 场景 Shuffle 文件排序造成的问题目前只能采用无需文件排序优化解决,具体优化逻辑可参考[CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files。

  支持更多 Shuffle Fallback 策略,保证高优作业稳定运行:根据生产环境发生过的异常故障适配更多 Shuffle 回退策略,采用回退到 Push Based ESS 方式降低 Spark 作业失败率,满足 SLA 要求保证作业的稳定性。

  集成 Native 引擎 Gluten/Blaze 优化 Shuffle 性能:Celeborn 支持 Native 引擎 Gluten/Blaze 集成,通过集成 Native 引擎同时保留 Native Columnar Shuffle 和 Celeborn Remote Shuffle 的核心设计,不仅保留 Columnar Shuffle 的 Hash-based Shuffle/Native Partitioner/零拷贝等高性能设计,而且充分利用 Celeborn 远端存储、数据重组和多副本的能力,叠加两者的优势大幅提升 Shuffle 性能。

  2.Flink 集成 Celeborn

  落地 Hybrid Shuffle 集成 Celeborn:Flink 社区在 1.16 版本引入流批一体的 Shuffle 模式 Hybrid Shuffle,它是 Blocking Shuffle 和 Pipelined Shuffle 的结合,让 Flink 批处理具备了更强大的能力。Celeborn 社区已经支持 Hybrid Shuffle 集成 Celeborn 方案,落地此集成方案能够带来动态调度,支持边读边写,基于自适应分层存储实现优先高性能的存储层,多层之间灵活切换,面向云原生环境友好的 Shuffle 模式等好处,详情参考[CIP-6] Support Flink hybrid shuffle integration with Apache Celeborn。

  落地 JobManager Failover:Flink 社区在 Flink 1.20 版本引入一种新的批处理作业恢复机制 JobManager Failover 方案,使 Flink 批处理作业能够在 JobMaster 故障转移后尽可能多地恢复进度,避免重新运行已经完成的 Task,这样使得 Flink 作业能够在 JobManager 发生故障时可以恢复已经完成的 Task 的计算结果,从而大幅降低 JobManager Failover 的代价,Celeborn 社区目前也正在支持 JobManager Failover,落地此恢复集成降低 Celeborn 的 Flink 批作业运行代价。

  支持 ReducePartition:ReducePartition 是由多个上游 Map Task 将属于同一个 Partition 数据推给同一个 Worker 做聚合,下游由 Reduce Task 直接读取,Celeborn Flink 目前只支持 MapPartition,可以考虑支持 ReducePartition 实现双 Shuffle 机制之间动态切换达到更优的 Shuffle 性能。

  支持 Celeborn 客户端与 Flink 发版解耦:Celeborn Flink 客户端发版强绑定 Flink 引擎镜像,对于紧急的问题修复和性能优化而言无法采取独立发布上线。为了提高 Celeborn 组件维护效率,需要支持解耦 Celeborn 客户端和 Flink 引擎,实现 Celeborn Flink Client Jar 单独发版。

  允许根据一系列 Shuffle 回退策略决策 Celeborn 是否 Fallback 到 Netty Shuffle 实现,实现细节参考 [CELEBORN-1700] Flink supports fallback to vanilla Flink built-in shuffle implementation。

0
相关文章