服务器 频道

Flink on K8S 在网易传媒的落地实践

  随着云原生技术的成熟和 Flink 版本对 K8S 支持的持续完善,网易传媒在 2022 年开始对 Flink on K8S 进行探索和落地,目前已迁移完成大部分作业至自研实时计算平台 Riverrun,并实现 Flink 实时计算与 Spark 离线计算在 K8S 上的稳定混部,带来了可观的“降本增效”收益。

  本文是《SparkSQL on K8S 在网易传媒的落地实践》的姊妹篇,将对我们 Flink on K8S 落地实践过程进行初步梳理总结,希望能给大家提供一些有用的参考。

  1 Flink on K8S 带来的收益

  Yarn 上没有实现较好的资源隔离,某个作业造成某台机器负载过高时, 会对该机器上其它作业造成影响;即使单独拆分出独立的实时计算集群,仍会存在实时作业之间相互影响的情况。另外,由于实时、离线计算均存在周期性数据处理需求的波动(“潮汐现象”),往往需要我们按照峰值来进行资源配置,存在资源浪费的情况。

  大数据计算积极拥抱云原生是业内发展的一大趋势,离线计算方面,网易传媒已经实现了 Spark on K8S 的稳定运行[1];实时计算方面,Flink on K8S 的落地则主要可以带来以下收益:

  利用 K8S 对资源隔离的较好支持,降低作业之间的相互干扰。

  整合实时、离线两套集群的计算资源池,实现统一运维和调度,减少基础设施开销。

  通过实时、离线计算混部带来的资源峰谷互补,有效提升集群的资源利用率,且大数据业务内的混部可以更方便的协调资源。

  2 Flink on K8S 部署架构

  目前,Flink 支持多种 K8S 上的部署模式:

  Standalone 部署模式:与物理机上的 Standalone 部署模式类似,用户需要自行维护所有资源的创建和管理。

  Operator 部署模式:根据用户的配置,基于 Flink Operator 自动管理 Flink 作业及其生命周期。

  Native 部署模式:类似于Yarn上的部署模式,JobManager 根据作业的需求自主创建和管理 TaskManager Pod。

  其中,Standalone 部署模式原理和使用比较简单,但存在资源利用效率低,维护成本高等问题;Operator 部署模式目前缺少可视化的界面,对于刚接触 K8S 的用户来说,首次使用的学习也成本较高;Native 部署方式由 1.12 版本起正式推出,经历了几个版本的持续完善,目前已经比较成熟且是社区推荐的 K8S 部署方案,因此我们主要选择了此模式进行探索落地。

Flink on K8S Native 部署架构

  如图所示,Flink Client 原生支持了 K8S Client,Job 提交后 Flink Client 会通过 K8S API Server 完成 ConfigMap、JobManager Deployment、Service 等资源的创建,然后 JobManager 中的 Resource Manager 模块会根据作业的资源需求与 K8S API Server 交互来进行资源的申请,完成 TaskManager Pod 的创建和销毁工作。

  实际落地中,状态快照存储方面,我们保留了小规模的 HDFS 集群,来尽可能地减少迁移和学习成本;高可用方面,也保留了当前团队运维经验比较丰富的 Zookeeper 来作为过渡。

  3 Flink on K8S POC 关键问题解决

  1.作业发布和启动速度

  Flink 实时计算场景中,相当一部分作业是以 Jar 的形式发布的。目前一些开源的方案是将用户作业 Jar 直接打包成专属镜像,不仅过程非常耗时,还会导致镜像仓库膨胀,这对于非常注重时效的实时计算业务来说,是难以接受的。

  对此,我们采用作业启动时资源动态挂载的方案,通过自定义 Init Container,在 Flink Pod 启动时,将用户作业 Jar 及其依赖的第三方 Jar/算法模型/本地方法库等文件地址(HDFS/S3)传递给 Init Container,由 Init Container 下载后通过 emptyDir 挂载到 Main Container 中,解决了频繁打包作业镜像的问题,有效提升了作业发布和启动的速度。而对于动态资源的共享,则可以通过自定义挂载 CephFS 等共享盘来实现。

Flink 作业启动时动态挂载资源

  另外,Flink 作业在启动或者 Failover 时,所有的 Pod 同时拉取作业文件资源,对 HDFS 等分布式存储性能要求较高。对此,我们主要采取了以下优化措施:

  Init 镜像精简及优化:为最小化 Init 镜像体积和加快资源文件的下载速度,快速完成环境初始化,将 HDFS Client 由原生的 Java 版本换成了 Go 版本,镜像体积减少 80%+,同网络下拉取普通资源文件速度提升 50%+。

  资源文件本地缓存:通过在 K8S 节点本地缓存作业的资源文件,大幅提升了下载速度。

  镜像发版前预热:使用 DaemonSet 等机制对新版本镜像进行预热,提升每次发版后首批作业的启动速度。

  Init Container 个性化处理:对于没有外部依赖的作业,TaskManager Pod 不引入 Init Container。

  经过优化,K8S 环境下的作业启动速度有了明显提升,做到了可对标 Yarn 的同等水平。

  2.作业 stdout 的采集和查看

  Flink 的 K8S Native 部署模式中,为了使容器保持一个前台服务,使用了 flink-console.sh 在容器前台运行 Flink 服务并进行 stdout 输出,用户无法正常地通过已经习惯了的 Flink Web UI 来查看,而且某些业务场景对 stdout 有较强烈的诉求。

  通过参考外部的一些解决方案,我们将 JobManager/TaskManager 的启动入口替换为 flink-daemon.sh,并改造使其支持 kubernetes-application 作业的提交,实现了容器环境下 Flink 服务的后台运行,恢复了 stdout 重定向文件的写入和通过 Flink Web UI 查看 stdout 的功能,并将容器前台日志替换为 Flink log,以方便定位问题。

改造 flink-daemon.sh 将前台日志替换为 flink log

  3.作业 Web UI 的外部访问

  对于作业 Web UI 的暴露和访问,Flink 支持了 K8S 的全部三种 Service。其中,LoadBalancer 需要额外服务,且成本较高;而 NodePort 需要在每台机器上映射端口,只适合小规模使用;ClusterIP + Ingress 则是比较合适的方案,一方面可以直接映射到域名规则,方便管理且易于访问,另一方面使用 Headless ClusterIP 则可以尽可能地降低服务成本。

  我们基于 Nginx Ingress Controller,实现了办公网络下对 Flink 作业 Web UI 的直接访问:

通过 Ingress 暴露 Flink Web UI

  4.作业本地状态的存储需求

  Flink 作业由于其有状态的特性,在很多场景下对存储和 IO 有较强的需求,使用了 RocksDB 等需要依赖本地存储 的 State Backend。对此,我们挂载了节点本地的 SSD 来进行支持,但由此暴露出另外一个问题:由于各作业 State Backend 配置的不同和状态存储需求上的差异,很难做到各机器节点磁盘负载的均衡,导致容易出现浪费情况。

  于是,我们把集群节点按照是否有挂载 SSD 进行了 label 分组,并将使用 FileSystem 和 RocksDB 等不同 State Backend 的作业进行区别调度,只将使用了 RocksDB State Backend 的作业调度至挂载了 SSD 的节点分组,一定程度上提高了 SSD 的利用率。

  挂载本地盘的方案会导致作业对集群环境产生依赖,不符合完全存算分离的理念。这方面,社区提出了 Tiered State Backend 的思路,将状态数据全部都原生存储在 HDFS 或者云上,本地磁盘和内存只作为状态数据的缓存加速层,目前行业内也出现了一些该方向上的探索和实践。

  5.混部场景下作业的抗干扰

  尽管 K8S 混部系统在资源隔离方面做得越来越完善,但在真实复杂的混部场景中,仍难免出现离线业务偶尔突发的负载对在线业务造成性能上的扰动。针对 K8S 混部集群常出现的组件/网络抖动等问题,我们对 Flink 引擎的默认参数做了对应的调整以提升作业的抗干扰能力。以下简单列举几个有代表性的:

  调大 `kubernetes.transactional-operation.max-retries`,以减少 JobManager 与 API Server 通信异常导致的作业 Failover。

  将 `high-availability.zookeeper.client.tolerate-suspended-connections` 设为 true,以避免 Zookeeper 连接 SUSPENDED 状态导致的作业 Failover。

  调大 Akka RPC 超时时间 `akka.ask.timeout` ,调大 JobManager 和 TaskManager 心跳超时时间 `heartbeat.timeout` 等,以减少网络抖动导致的作业 Failover。

  4 Flink on K8S 实时计算平台特色

  Flink on K8S POC 后,我们自研落地了实时计算平台 Riverrun,目前支持 JAR/SQL 类型作业的开发运维,并整合了 Jar 作业在线构建、资源自动化调优推荐、日志检索、异常智能关联诊断、作业批量运维等特色功能。  

Riverrun 实时计算平台架构

  1.Jar 作业支持在线构建和发布

  目前,平台上的大部分作业是以 Jar 形式发布的,原有的发布流程是由开发人员本地直接打包上传,存在版本管理混乱、本地打包环境不一致、非公司网络下上传缓慢等问题。

  为加强对作业版本的管控和加速发布流程,平台集成了作业在线构建能力。用户只需要简单配置 Git 仓库、代码分支、模块等信息后,即可快速完成作业的在线构建和发布。

  作业在线构建模块根据公司的 Maven 镜像等基础设施环境进行了最优化配置,支持自动识别项目构建配置、实时查看构建日志、自动复用构建包缓存等功能,有效减少了人为因素引入的问题,实现了线上作业构建环境的统一。

Jar 作业的在线构建和发布

  目前,平台还在朝着 CICD 的方向继续完善,以进一步提升用户的开发和运维体验。

  2.作业资源配置的自动化推荐

  Flink on Yarn 场景下由于缺乏较好的 CPU 隔离机制,作业 TaskManager Container 的 CPU 申请量不能如实的反应实际使用量,两者间往往存在较大偏差。作业从 Yarn 上迁移到 K8S 上之后,由于 K8S 上较严格的资源管控机制,常常出现资源配置冗余、不足或 CPU/内存配比不合理的情况。

  另外,在实际业务场景中,开发人员在进行 Flink 作业资源配置时通常会留有一定的冗余,以保障作业的稳定运行,这往往导致集群的资源申请水位虚高,利用率无法保证。

  借鉴开源项目 Crane的资源推荐思路,平台根据作业历史运行指标实现了一套闭环的资源配置推荐机制。得益于 Flink 和 K8S 较为完善的 Metrics 体系,我们可以比较轻松地建立起作业的资源画像,通过“推荐->调整->重新画像”的持续循环,逐渐让作业资源申请与实际使用比率回归到较合理的水平:

  CPU:按照最近两周使用峰值的 P99 来进行推荐,另外平台还通过配置 JobManager/TaskManager 的 cpu.limit-factor 为来业务流量的抖动留出 buffer。

  内存:内存作为一种不可压缩资源,配置过低可能会引发 JVM 或 Pod 层面的 OOM。内存申请的推荐,在最近两周使用峰值 P99 的基础上增加了一定的 buffer,以减少因内存临时增长导致的潜在 OOM。另外,Flink 内存模型较为复杂,除了整体资源配置外,还需要综合考虑内存模型中各部分的使用情况,这方面有待方案的进一步细化。

  资源推荐的结果除了定期推送给用户之外,平台还支持用户主动地对作业最近运行情况发起分析,并实时给出建议。

作业最近运行情况分析

  目前,平台推荐的资源配置还有赖于用户评估后手动进行调整;针对一些风险较低的操作,平台正在灰度测试自动进行调整重启,进一步减轻人力负担。

  3.作业运行状态的全方位监控

  作为观察作业运行状况的核心手段,作业运行状态的采集和监控是平台必备的功能。采集的数据主要包括作业运行时的日志和 Metrics 等。

  日志方面,K8S 上由于缺乏类似于 Yarn 上的 Log Aggregation 机制,容器销毁后日志也将被删除,所以需要自行采集 Flink 日志用于检索。平台上将日志功能做了以下拆分:

  启动日志:包括 Flink Client 提交作业的日志和提交后 JobManager 初始化作业集群的日志,其中后者为通过 K8S API 实时拉取的 JobManager Pod 启动后截止到提交 Job 的日志,这部分日志需要在平台上实时展示给用户,让用户及时感知作业的启动情况。

  运行日志:包括作业运行时 JobManager 和 TaskManager 的日志,通过将 emptyDir 挂载到日志路径,然后在宿主机中对所有日志进行统一采集后经由 Kafka 写入 OpenSearch,用户可在平台上按启动批次+节点进行筛选浏览。另外,平台还支持了作业日志级别的动态修改,用户可以通过临时开启 DEBUG 日志进行问题排查。

  日志搜索:基于 OpenSearch 增加了运行日志的搜索功能,并整合了“快捷诊断”来帮助用户快速筛选线上常见的异常:

  from RUNNING to FAILED:该关键字后一般会紧跟作业失败的直接原因

  java.lang.OutOfMemoryError / Cannot allocate memory:发生了 OOM,需要优化内存使用

  Checkpoint was declined / Checkpoint was expired / Checkpoint canceled:Checkpoint失败,需要根据作业运行状况和日志上下文做进一步判断

  Exception / TimeoutException / NullPointerException:NPE、外部系统通信超时等各类异常

  Metrics 方面,Flink 引擎本身提供了大量丰富的指标,可以为作业的监控和告警提供基础的数据。但随着平台上作业数量的持续增长,Metrics 的采集、存储、查询都逐渐暴露出压力,如何实现海量 Metrics 数据的高效处理成为亟需解决的问题:

  Metrics 采集:我们按照社区的接口规范实现了 Kafka Metrics Reporter,支持按配置的规则过滤掉不需要的 Metrics,并对相同维度的多条 Metrics 进行合并,有效减少上报的数据量。

  Metrics 存储和查询:我们选择了支持云原生部署的时序数据库 TDengine,其超级表/子表及标签分离存储机制等设计为数据读写提供了较高的性能支持,但毕竟是较年轻的项目,我们在使用的过程中也踩到了不少坑,这方面后续再另行总结。

  除了 Flink Metrics,平台还采集了 K8S 上 Flink Pod 的 Metrics/Events 等数据,并接入了 K8S 集群和相关外部组件的监控数据,以求构建一套尽可能全面的监控体系。

  告警方面,在作业运行状态监控的基础上,告警可大致归纳为基础事件告警与复杂事件告警两类:

  基础事件告警:主要为作业存活状态等基础事件告警,如运行异常、失败或发生 Failover 等,通过监控数据直接判断或者后端服务定时轮询 Flink REST API 来实现。

  复杂事件告警:主要为统计学意义上的事件或趋势告警,如 Checkpoint 失败、数据堆积、反压等,支持自定义告警阈值,一般需要维护状态,通过 Flink 作业实时处理 Flink Metrics 和 K8S Metrics 来实现。

复杂事件告警支持灵活配置阈值

  目前,告警阈值的配置需要用户手动根据业务场景特点进行配置,一定程度上依赖于用户自身的经验。我们正在探索基于作业历史运行情况的阈值自动推荐,来进一步减轻用户的运维压力。

  另外,作为主动 push 给用户的通知,告警需在保证场景覆盖率的条件下,尽可能地减少给用户带来的打扰。平台针对配置了多个告警接收人的作业,提供了运维操作周知的功能,来避免重复运维;针对同一集群短时间内出现大批量告警的情况,设计了告警升级策略来减少告警轰炸,由平台运维人员介入排查。

  4.作业异常的智能关联诊断

  在真实的集群环境中,作业时常会受流量、磁盘、中间件、平台组件等影响发生异常。传统的作业异常诊断流程对人工的依赖较重,通常需要以日志作为排查的第一入口,先定位到引发异常的直接原因,然后再综合其它相关维度的监控指标来做更深层次的归因,不仅对用户的专业技术背景要求较高,也相当耗费用户的精力。

  业务上,实时作业作为 long-running 型的计算,业务对时效性的要求普遍较高,要求出现异常后能在尽可能短的时间内定位原因并恢复正常。因此,平台研发了智能关联诊断功能,对常见的 case 进行提炼总结,希望能帮用户提升诊断的效率,并把用户从重复性的场景中解放出来。

  作业异常的原因可以大致归纳为作业本身逻辑导致和运行时环境导致两种。作业本身逻辑导致的异常我们一般可以从作业日志中拿到直接原因,常见的如类不存在、依赖缺失、堆内存 OOM 等;而运行时环境导致的异常我们通常需要关联更多维的数据才能诊断出底层的原因,如出现脏数据、网络/磁盘故障、K8S 组件异常、外部中间件异常等。这需要我们尽可能地打通 Flink 作业和相关基础设施的运行状态数据,从一个个实际的 case 出发,把上下游事件串起来并从中提炼因果关系,沉淀出异常诊断的规则库。

基于规则库的作业异常诊断+处理方案建议

  目前,随着诊断规则库的持续积累,智能关联诊断功能已进入到灰度测试阶段。当作业异常命中诊断规则库后,平台会将诊断结果和处理方案建议推送给用户,由用户评估后跟进处理。另外,Flink 本身支持配置异常后的 Failover 重启策略,但当作业超过策略限制时会进入停止状态。此时,如果作业授权了托管运维且一定时间内无用户跟进处理,平台将尝试介入,按照规则库的处理预案处理后,拉起作业并将结果通知给用户,以尽可能保障作业的持续运行。

  5.作业的一键批量运维

  混部场景中由于各业务的复杂性,集群机器节点的运维相对来说变得更加频繁。以节点停机运维为例,传统的常规操作是直接驱逐节点上的 Pod,但这会导致短时间内大批量作业的被动 Failover,既会对 Checkpoint 存储集群造成较大的瞬时冲击,也容易因资源争抢导致线上业务的抖动。另外,集群故障、业务间资源借用等场景也对大批量作业的快速跨集群迁移提出了要求。

  为此,平台基于已有的作业启停流程设计了批量运维功能,支持用户根据机器节点、作业标签、手动录入等方式对线上作业进行圈选,然后以指定的并发度完成分批启停或跨集群迁移等操作。批量运维操作时,平台会将操作计划推送到项目组的用户群;操作过程中也会将操作进度和操作结果实时推送给操作人和作业归属人。批量运维功能的引入有效减轻了平台侧和用户侧的运维负担。

作业批量运维操作执行

  另外,在业务层面,针对集群/机器故障运维的场景,相较于直接驱逐 Pod 让作业被动 Failover,由平台主动进行批量迁移对Checkpoint 存储集群的冲击更可控,有助于减少资源争抢造成的踩踏事故;针对作业跨集群迁移的场景,则可以结合作业标签等的快速筛选,来优先保障核心 SLA 作业。

  5 总结与展望

  传媒当前大部分 Flink 实时计算作业已经迁移至 K8S 上,并实现了与 Spark 离线计算作业的稳定混部。未来,在继续提升 Flink on K8S 架构稳定性和完善实时计算平台的基础上,将主要针对以下几个方向进行着重探索:

  状态快照和资源文件使用对象存储:项目初期为尽快落地 Flink on K8S 和减少作业由 Yarn 到 K8S 的迁移成本,延用了 HDFS 作为快照存储。后续将探索对象存储(包括原生的和基于对象存储的 HDFS 兼容文件系统)的落地,以进一步打通整个部门的存储资源池。

  作业实现自动伸缩:对于在线业务而言流量存在波峰波谷,具有一定的周期性,作业负载也会随之波动。开发者往往按照流量峰值来进行资源配置,存在资源浪费的情况。实现作业的定时伸缩甚至是弹性伸缩,具有显著的意义,比如凌晨可以将实时计算作业缩容,以释放出更多的离线算力。

  Paimon 湖仓建设:大数据领域数据湖架构日益成为新的技术趋势,Paimon (原 Flink Table Store) 作为从 Flink 项目孵化出来的新一代流式数据湖存储,历史包袱小,后发优势明显。基于 Paimon 探索流批一体湖仓的建设将是非常好的选择。

0
相关文章