为了有效管理云成本,基于携程混合多云和自建PaaS为主的现状,混合云团队研发了FinOps计费系统。本文将介绍计费系统基于Kafka构建的接入体系在数据质量与治理方面的挑战,并分享基于自研Kafka Gatekeeper构建度量及治理自助化自动化的实践。
一、现状与问题
1.1 现状
图1-1
如图1-1所示,携程目前使用了混合多云的模式,同时也以自建PaaS服务为主,因此计费系统除了需要从云商获取账单等信息,还需要接入几十种自建PaaS服务的用量信息。其主要结构如下:
1)TripCostAllocationProtocol: 为了计费接入的扩展性,计费系统设计了计费协议,兼容混合多云模式,支持自建及原生PaaS/SaaS服务。
2)计费数据接入:云原生服务统一由计费系统处理,自建服务由各团队按TripCostAllocationProtocol定期打点的方式,将用量信息投递到Kafka中。
3)计费处理:计费系统根据接入的账单、用量以及服务间的关系,进行递归结算,并将结果落到内部数仓。
1.2 问题描述
计费系统上线后,成功为管理层、运营以及研发等角色提供了计费及成本分析的能力。在数据质量上,系统在数仓结果表上创建了对应的检测规则,针对明显违反业务逻辑的数据可以触发告警。但是,随着系统的推广,数据质量的问题仍然居高不下,具体表现如下:
问题发现:
a. 覆盖率低:针对数据错误程度在一定范围内,但仍然符合业务逻辑的问题,无法被检测到。
b. 及时性差:由于检测基于结果表,告警只能针对计费结果,告警结果有滞后性。
问题定位:
a. 效率低:检测是运行在计费系统内部的结果表上,需要多个数据接入方团队及计费系统开发人员共同排查,确定问题发生的源头及原因。
问题治理:
a. 责任不明:由于质量检测基于结果表,但造成问题的源头多样。无法通过对结果的检测直接将问题归属到对应团队,问题无法获得及时的关注与处理。
数据质量是计费系统的生命线,原有系统在质量上的问题导致计费系统开发团队大量的时间被消耗在对质量问题的响应、排查与修复,无法集中精力投入在产品迭代上,也无法应对更多服务的接入。
1.3 解决方案
针对以上问题,我们决定重新构建数据质量治理的能力。目标如下:
问题发现
a. 全问题覆盖:从数据源头出发,所有不符合校验规则的初始数据都可以被发现。
b. 及时性提高:数据异常在进入计费链路之前就可以被发现,而非通过计费结果告警。
问题定位:
a. 提高效率:无需多方团队协调排查,自动捕获问题源头及问题发生的原因。
问题治理:
a. 责任明确:问题产生的当下就告知相关责任人,明确问题治理的对应团队。
通过分析数据质量的案例,发现绝大部分数据质量的问题来自于几十个自建服务数据接入方。根据以上目标的梳理的问题的分析,我们决定引入Kafka Gatekeeper组件,重点解决自建服务接入的质量问题,如图1-2所示。该组件提供以下能力:
1)校验前置:打点数据在进入计费逻辑前,先进行规则校验,保证问题发现及时性。
2)规则可配置:校验规则可随时配置、随时更新,保证规则检测的全覆盖。
3)自助排查:提供自助查询看板,包括数据错误条数,问题发生原因等信息,研发可自助查询对应团队的相关信息,提高问题定位效率。
4)自动告警:检测发现不合规数据(如字段缺失、数据类型错误等)时,向数据来源的团队发送告警,明确问题治理责任。
图1-2
二、设计与核心实现
2.1 Kafka的相关背景知识
为了实现Kafka代理服务的数据校验功能,需要解决以下两个问题:
1)如何根据Kafka协议对消息进行解码。
2)如何处理Kafka客户端,服务端和代理之间的连接关系。
2.1.1 通讯协议
图2-1
如图2-1所示,Kafka请求只能由Client主动发到Broker,Broker针对每个请求回复响应给Client。
Kafka使用基于TCP的自定义二进制协议。它定义了客户端和服务器之间的消息格式、消息传递方式和处理逻辑。所有消息都是通过长度来分隔,并且由基本类型组成。请求由请求类型(ApiKey),版本号(ApiVersion),相关性标识(CorrelationId),客户端标识(ClientId)和请求消息(RequestMessage)组成。响应由相关性标识(CorrelationId)和响应消息组成(ResponseMessage)组成。
ApiKey用于确认Request的类型,以通过不同类型的数据格式解析请求。Request和Response通过CorrleationId来一一对应。
由于发送生产消息,仅包含两种API--元数据(Metadata)和生产(Produce),本文仅关注这两种API的请求和响应,协议格式见图2-2。
图2-2
Metadata是用于获取元数据的API。元数据请求在携带topic_name时会返回topic相关的数据,如果为空则返回所有主题。元数据响应返回的数据包括一串broker的数据信息,以及topic名、分区信息等。图中省略部分内容,仅展示和本文相关的部分。
Produce是用于将消息集发送到服务器的API。生产请求将携带目标topic,以及分区信息,其中分区信息中包含所要发送的具体消息记录集合。生产响应返回的数据包括具体的请求结果。图中省略部分内容,仅展示和本文相关的部分。
通过了解以上两种API的格式,可以基于协议格式进行解码。
2.1.2 交互流程
处理连接关系,还需要了解Metadata、Produce协议的交互流程。
元数据请求可以发往任意broker。Kafka集群会提供Bootstrap地址,由此地址负载均衡到某一服务器并返回。客户端提供一组topic,服务端返回元数据响应,包含所有的broker信息和相关的topic信息。broker信息中包括节点的IP地址,即客户端真正发送生产信息的服务器地址。
生产请求将会发送到元数据请求中返回的某一服务器上,服务器端将会返回请求结果。
图2-3
如图2-3所示,将集群简化为一个Broker,Produce的具体流程:
1)Client向Bootstrap地址发送元数据请求,查询集群当前Broker列表。
2)Bootstrap真实响应的Server其实是(某一个)Broker,Broker返回了所有的信息包含在元数据响应中。
3)Client向真实的Broker地址发送生产请求。
4)Broker处理请求,并回复响应。
通过了解Kafka生产的基本流程,可以实现代理,接管并处理其中的连接关系。
2.2 Kafka Gatekeeper的设计和实现
Gatekeeper作为Kafka客户端和服务端之间的代理,接受客户端的请求对于指定内容做数据校验,并转发给服务器,同时将服务器的响应返回给客户端。
对于客户端来说,仅需要将原本配置的Boostrap地址改成Gatekeeper的地址。
对于Gatekeeper来说,需要做到:
1)设计解码器和解码方案:解码Kafka消息,从而进一步进行处理。
2)设计校验器和校验规则:进行数据校验,提高数据质量。
3)维护Boostrap地址和Gatekeeper地址之间的映射关系:处理客户、Gatekeeper、服务间的连接。
Gatekeeper设计架构如图2-4所示。
图2-4
解码器用于在处理请求时,根据Kafka协议和自定义的解码方案解码。当解析元数据请求时,根据自定义的映射关系修改返回的元数据响应。
校验器当解析的是生产请求时,会根据自定义的校验规则进行校验。
映射关系被维护在Gatekeeper中处理连接关系。
Gatekeeper维护的映射关系中,由于Kafka的默认端口号是9092,"Gatekeeper的IP地址+9092端口"的连接将与"Bootstrap的IP+9092端口"做映射。"Gatekeeper的IP+port1"的连接将与"Broker1的IP+9092端口"做映射,"Gatekeeper的IP+port2"的连接将与"Broker2的IP+9092端口"做映射,依此类推。Gatekeeper就可以根据这个映射关系,处理来自客户的请求,发送给相应服务端,并同样处理来自服务端的响应。
总而言之,Kafka Gatekeeper监听了客户端发来的请求,根据配置转发给服务端,一方面解析了客户端的生产信息做数据校验,另一方面修改了服务端的元数据响应信息给用户,以保证用户的生产信息总是通过Gatekeeper进行转发。
2.2.1 利用通讯协议进行解析
通过前文可知,每个请求和响应都有固定格式的header和具体的请求包。而由于Kafka每一种协议也都有固定的格式,Kafka协议中可使用的数据类型是固定的,且是按顺序存储的。
因此,只需给每种数据类型实现一个特定的编解码方案,并通过header中携带的ApiKey和ApiVersion,确定某一个解码格式,就可以根据收到的包序列化数据。
综上所述,Gatekeeper的解码器需要完成两个任务:实现不同数据类型的序列化功能,以及根据版本确定协议格式。
以version-1的生产消息为例:
从编解码角度来说,每个协议包都是由4字节的size开头,后面再跟相应字节的请求包或响应包。解码器首先会通过序列化功能解析了这个4字节的size,计算出请求包的大小。
同样的,解码器计算出2个字节的ApiKey和ApiVersion(和本文无关的其他字段暂时略过)。解码器计算出生产请求的的ApiKey为0,ApiVersion为1。这样解码器的确认版本功能就能确定一个协议格式,再根据这个格式的数据类型去逐个做解析。
至此,Gatekeeper可以就根据不同类型,不同版本的的客户端请求,完成解析。根据即时的解码内容,针对需要再进一步处理,可以保证问题发现的及时性。
2.2.2 利用交互流程进行连接处理
Gatekeeper的工作原理是在本地机器上打开tcp套接字,并在使用套接字时,代理连接到相关的Kafka服务,它将本地端口与真实的服务地址进行映射。
图2-5
如图2-5所示,用户加入Gatekeeper服务后,Produce的具体流程:
1)当客户端发起第一条元数据请求时,发送地址是Gatekeeper地址,请求将会被Gatekeeper监听到。
2)Gatekeeper发起一条到服务端的连接,把监听到的连接发送给Boostrap地址,同时存储一份Boostrap地址和本地地址的映射。
3)元数据响应会返回一串Broker的相关信息,Gatekeeper接收到相关信息后,会解析内容将Metadata数据中原本的节点IP信息,替换成Gatekeeper的地址。
4)当客户端发送Produce请求时,通过接收到响应里的地址和Gatekeeper建立连接的。
综上所述,客户端的Produce流程都经过Gatekeeper,Gatekeeper可以对所有的Kafka消息进行校验。保证打点数据进入计费链条前先进入校验逻辑,实现校验检测全覆盖。
2.2.3 可配置化校验与自助异常定位
Gatekeeper的目标,是提供一个针对Kafka消息的前置数据校验代理,解决接入服务的数据质量问题。从数据源头入手,配置校验规则检查每个topic的数据是否合规,定位异常数据来源,向相关团队告警,并提供自助排查看板。
提供可自定义配置的校验规则,可以随时更新、订正,并且提供根据解码内容明确责任人的功能。Gatekeeper的校验器会根据配置的规则,对比分析解码内容做校验。提供包括判断字段类型、检查字段缺失、以及符合CEL语法的校验规则等功能。
以如下schema为例,TripCostAllocationProtocol约定某topic必须包括,不为空的字符串Name字段,和可选且大于零的整形Timestamp字段。
相应的,Gatekeeper配置如下,针对该topic的TripCostAllocationProtocol约定,配置校验规则。在数据流经代理时,根据规则全面检测。
以Timestamp字段非法零值为例,当Gatekeeper检验发现,此条消息不符合配置的规则 "Timestamp>0",会根据配置的Owner,锁定数据来源“Service A”并告警反馈给该团队,明确问题责任。
此外,Gatekeeper也提供了自动告警功能和自助查询功能。如图2-6所示,处理用户生产数据的流程以实线表示,是“用户-代理-服务”的线性流程,全覆盖所有数据一进行校验。同时Gatekeeper向日志系统和监控系统分别发送了校验失败指标和详细信息日志。
图2-6
校验到不合规数据时,用户接收监控告警,通过监控系统可以查看包括数据错误条数,校验通过率等内容。
以上述错误为例,自建PaaS服务Service A收到fake.topic中生产了校验不合法数据的告警。Service A的研发团队通过检查告警系统检查告警信息。研发团队可以跳转到对应的日志系统,以检查错误日志以及校验不通过字段的规则,根据错误日志自助修复数据。
通过自动告警,提高了问题的定位效率,明确了问题责任方。通过提供自助查询看板,可视化展示校验结果和异常来源,方便研发团队自助修复数据,闭环治理流程。
2.2.4 高可用部署
根据携程的可用性优秀实践,实现跨AZ高可用和数据校验就近处理。如图2-7所示,服务给用户提供统一入口,并在AZ内部署多个实例提供服务。
图2-7
如图2-8所示,(1) 是原始的Kafka客户端和服务端交互过程示意图,(2) 是单AZ内,增加了Kafka Gatekeeper做代理后的交互过程示意。
图2-8
此前,Gatekeeper的地址需要承担的三项责任--监听,广播和提供入口:
1)监听来自客户的元数据和生产请求。
2)返回元数据响应时,提供给用户新的生产请求发送的广播地址。
3)提供客户端,用于替换原本Bootstrap的入口地址。
在高可用的部署架构下,Gatekeeper的地址不再承担提供入口的责任,客户端使用总的入口地址代替原本需要配置的Bootstrap地址。
2.2.5 技术挑战
在某次重启服务后,尽管服务看起来正常运行,并且新增接入的客户端也符合预期,但部分已接入的Java客户端总是连接失败,直到客户端自行重启服务。经过测试发现,这种情况与客户端刷新元数据的行为有关。
Kafka生产请求不可达时,客户端会做一次元数据的刷新。响应这个刷新元数据请求的Broker会根据注册中心返回当前可达的Broker节点列表。在第一次发送元数据请求时,客户端连接的是Bootstrap地址(加入Gatekeeper的流程中,连接的是Gatekeeper的IP地址)。然而,在刷新元数据时,客户端的行为有一些区别。
图2-9
如图2-9(1)所示,测试时使用的客户端刷新MetaData的请求,和初始发送元数据请求一样,发往Bootstrap地址。而Java客户端只有第一次启动时把元数据请求发给Boostrap地址,如图(2)所示,Java客户端的刷新MetaData的请是直接打给具体Broker地址的。
在加入Gatekeeper服务之后,如图2-10(1)所示,监听请求的地址和返回的广播地址是Gatekeeper自己的IP地址,广播地址用于代替元数据响应中的的Broker地址信息。也就是说,Java客户端在刷新元数据信息时也会将请求发往Gatekeeper的IP地址。而由于服务重启,Gatekeeper实例的IP地址发生了变化。因此,客户端也无法通过刷新元数据得到目前可达的服务地址,导致连接失败。
图2-10
总之,不同客户端在刷新元数据的方式上存在差异。Java客户端会缓存Gatekeeper的IP地址,当这个地址变得无效时,连接就会失败。而当客户端的连接总是发送到一个有效的负载均衡地址,因此不会出现这样的问题。
因此,只需让客户端缓存的地址,也就是在元数据响应中返回的地址,总是有效的,就可以避免以上问题。
原本Gatekeeper的IP地址承担了监听和广播两个功能,其中广播地址与Broker地址一一对应。如图2-10(2)所示,为了解决固定IP的问题,服务使用挂载的固定负载均衡地址替换原有广播地址。同时,使用固定范围的端口号代替随机的端口号,且LoadBalancer上与Gatekeeper上映射的端口是完全一致的,就可以解决固定端口的问题。
这样,即使Gatekeeper不像Kafka一样使用注册中心来注册所有可达的地址,仍然可以确保客户端始终能够找到服务,而不会丢失连接。
三、总结
Gatekeeper是一个提供对Kafka数据进行校验的工具,并实现校验规则的可配置化。同时Gatekeeper还提供了可视化展示校验结果和异常来源的监控看板,并提供自助查询错误日志的功能。在源头实现定位异常,保证了规则检测全覆盖,并提供自动异常发现和自助异常定位服务,从而完成了治理闭环,提升了数据质量。
目前Gatekeeper的适用范围仅限于FinOps计费系统,但整体架构是针对Kafka消息设计的,因此它可以作为一个可复用的数据校验代理。未来,Gatekeeper希望能提供一个通用的数据系统校验处理能力,以解决更广泛的数据质量问题。