服务器 频道

小红书离线数仓提效新思路,提升百倍回刷性能

  数据处理效率一直是大数据时代的核心话题,它推动着各类数据执行引擎持续迭代产品。从早期的 MapReduce,到今天的 Spark,各行业正不断演进其离线数仓技术架构。

  现有以 Spark 为核心的数仓架构在处理大规模数据回刷方面已取得进展,但在资源和时间消耗上仍面临挑战。为了突破这些限制,小红书数据仓库团队将 StarRocks 融入到离线处理流程,替换掉部分 Spark 处理的任务,并优化较为耗时的 Cube 计算,大幅度提高了数据的执行效率。

  实践证明,经过改造的离线处理链路,可以有效降低任务资源消耗,提前数据产出时间。将作业执行时间从小时级压缩至分钟级,计算资源使用量降低 90% 以上,日数据产出时间提前 1.5 小时,回刷时间减少 90%,回刷成本减少 99% 以上。  

  

  为了更好地管理和使用数据,离线数仓一般会通过分层设计,确保数据高效利用。

  ODS 层(操作数据存储层):收集来自客户端和服务端数据的原始日志。其中,服务端数据存储结构与线上表结构保持一致。

  DWD 层(事实明细层):ODS 层数据在此层进行清洗和整合,经历必要的数据转换和计算,从而形成一个详细的、一致的、历史的和集成的数据集。

  DWS 层(数据聚合层):该层汇总 DWD 层数据,分为轻度汇总和汇总。轻度汇总维度较多,便于上卷,形成汇总层。数据一般为当天的计算或加总。

  DM 层(数据宽表层):这一层有确定的核心实体或者场景,可能跨数据域。根据业务需求,基于某个分析主题进行数据加工,对 DWS 层数据进一步地加工处理,形成各种丰富的数据模型。与 DWS 层的主要区别在于:度量值中是否包含“一天以外的加工数据”,如近 7 日,近 30 日,近 90 日等多日聚合指标。

  APP 层(数据应用层):在这里,DM 层的数据结果会被转化为直观的报表、动态的大屏、和便捷的数据服务,以支持决策和业务洞察。为了提升查询效率,数仓会预先计算 Cube(即不同维度组合下的指标),将其存储在表中。

  DIM 层(公共维度层):这一层用于存储各类实体的维度数据,为数据分析提供多角度的视野。

  离线数仓一般以 Spark 引擎作为主力,它负责数据的清洗、关联和聚合,完成所有数据模型的建设。随后,通过 DTS 任务将 APP 层的数据导入到 OLAP 集群中。小红书主流的 OLAP 引擎包括 StarRocks 和 ClickHouse,它们凭借 OLAP 引擎的查询能力,为数据产品、分析看板和业务工具提供数据查询服务。  

  虽然 Spark 引擎以其强大的吞吐量和稳定性在离线数仓中被广泛使用,但它在数据查询优化方面存在局限。Spark 并不直接管理数据的分布、存储格式或元信息,无法结合数据存储格式和数据元信息进行查询优化。此外,为了确保稳定性,Spark 在跨节点数据传输时需要将数据写入磁盘,这在大规模数据回刷时会导致资源消耗巨大和处理周期延长。

  从本质上来看,Spark 仅仅是一个数据处理引擎,而不是一个理想的数据仓库分析引擎。在实际应用中,这种性能瓶颈尤为明显,开销较大。例如,以交易运营行业为例,若要回刷两年的数据,则需要占用相当于 7 万台机器近 30 天的资源,成本高达上百万元。这种定期数据回刷产生的巨额成本,已经成为数据仓库团队不得不面临的问题。  

  与 Spark 这类数据处理引擎不同,基于 MPP 架构的 OLAP 引擎在数据查询方面是更具优势的。市面上常见的 OLAP 引擎主要有两个:ClickHouse 和 StarRocks。

  ClickHouse 是一个开源的列式数据库管理系统,可用于 OLAP 分析。它采用列式存储,与传统的行式存储相比,这种设计在处理分析型查询时更为高效,因为它能够快速读取和聚合列数据,无需加载整个行。ClickHouse 的 MPP 架构允许查询任务被拆分为多个子任务,并在集群的多个节点上并行执行。每个节点都配备独立的的处理器和存储资源,使得系统能够充分利用集群的计算和存储能力,大幅提升查询速度和系统吞吐量。此外,ClickHouse 的 MPP 架构还支持数据复制和分片,提高数据的可用性和查询性能。即使某个节点发生故障,其他节点也能迅速接管任务,确保服务的连续性。ClickHouse 是用 C++ 编写的,它在单核性能上进行了深度优化。

  StarRocks 也是一款高性能分析型数据仓库,可实现多维、实时、高并发的数据分析。StarRocks 采用了向量化、MPP 架构、CBO 优化器、智能物化视图和列式存储引擎等先进技术,因此与同类产品相比,在查询效率上具有较大优势。StarRocks 能够高效地从各类实时和离线数据源导入数据,并直接分析数据湖中的多种格式数据。StarRocks 兼容 MySQL 协议,常用 BI 工具能轻松接入。此外,StarRocks 支持水平扩展,确保了高可用性、可靠性和易于维护。

  在小红书内部,StarRocks 版本以存算一体架构为主,其中前端(FE)负责元数据管理和构建执行计划,而后端(BE)则负责数据存储和计算。这种架构使得查询能够直接在 BE 节点上本地执行,避免数据传输与拷贝开销,从而实现极速的查询分析性能。存算一体架构还支持数据的多副本存储,提升了集群在高并发环境下的查询能力和数据可靠性。  

  StarRocks 对算子和函数进行了向量化加速,并通过 Pipeline 调度框架,充分利用多核计算能力,提升查询性能。虽然 StarRocks 和 ClickHouse 在单表查询性能上相近,但 ClickHouse 在查询并发和不支持分布式 Join 的局限性,使其不适合作为生产数仓模型的查询加速引擎。因此,我们选择了 StarRocks 替换原有的 Cube 计算,期望在数据处理和分析方面达到更高的性能和效率。  

  

  为了提升离线数仓的产出效率,我们对架构进行如下优化:

  直接导入:将 DM 表、DWS 表和常变维度的 DIM 表直接导入 StarRocks 中,简化数据处理流程。

  Cube 表建模:在 StarRocks 中完成计算密集型的 Cube 表建模,以提高数据处理速度。

  计算 UV 的一般方式是使用 count distinct ,它能够保留原始数据的明细,有较高的灵活性。然而,由于在查询执行的过程中需要进行多次 shuffle(跨节点通过网络传输数据),会导致查询性能随着数据量增大而直线下降。

  以下面的 SQL 为例,示例 1 :

  其执行过程中,首先会构建一个中间表 tb1,并扩展出三个虚拟维度:c1、c2 和 c3。

  c1: if(buy_num>0, user_id,null)

  c2: if(imp_num>0, user_id,null)

  c3: if(click_num>0, user_id,null)

  因为有三个 count distinct 的维度,数据也会扩展为三倍。随后经历三轮 shuffle 才能得出结果。该过程中数据会膨胀,因此 shuffle 的数据量会比较大。  

  针对 Cube 表中的 id 消重指标,如用户数、商品数等,我们采用了 BitMap 技术。BitMap 基本原理是用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。与传统的 count distinct 方法相比,BitMap 消重在空间和时间上都显示出显著优势:

  空间优势: BitMap 通过一个 bit 位标记 id 的存在,可看作是对一个集合的压缩结构,大幅减少了存储需求。比如对 int32 去重,使用普通BitMap 所需的存储空间只占传统去重的 1/32。StarRocks 采用的 Roaring Bitmap,能进一步降低稀疏数据的存储空间。

  时间优势: BitMap 去重的计算操作,分为对给定下标的 bit 置位和统计 bitmap 的置位个数,时间复杂度分别为O(1)和O(n),且后者可使用 clz、ctz 等指令高效计算。此外, BitMap 去重技术在 MPP 执行引擎中还可以并行加速处理,每个计算节点独立地生成其对应的子 BitMap,然后通过 bitor 操作高效地将这些子 bitmap 合并为一个完整的去重结果。与传统的基于排序(sort)或哈希(hash)的去重方法相比,bitor 操作不仅减少了数据的无条件依赖和依赖关系,还能够实现向量化处理,从而大幅提升去重操作的效率和性能。

  BitMap 大小取决于最大 id 值,直接关系到查询的稳定性和性能。StarRocks内置的编码函数能够将字符串类型的 id 转换为 64 位的数字 id,但这样的转换可能导致生成的数字 id非常大,影响性能和稳定性。为了解决这个问题,我们引入了编码表,它的作用是将字符串 id 映射到一个更小范围的数字 id,随后我们把数字 id 转化为 BitMap。

  编码表的逻辑类似于数据库的自增逻辑,即首个 id 对应的数字是 1,后续每新增一个 id,对应的数字 id 就自增 1。从而保证每个字符串 id 都会拥有一个唯一的数字 id,也有效缩小了 BitMap 占用的存储。

  那么经过 BitMap 改造的任务,示例 1 中的 SQL 执行过程就变成了下图的执行过程。shuffle 数据量等于原表数据量,并且只需要一轮 shuffle。  

  使用过程中,Cube 计算会占用大量的 CPU 资源和内存资源。在我们的应用场景中,需要处理的 Cube 数量多达上百个,这对 StarRocks 来说,经常会导致内存溢出(OOM)的情况。StarRocks 在执行 SQL 查询时,一般会将所有数据置于内存中,且计算过程中的数据不会 Spill 到磁盘上。为解决资源瓶颈这一问题,我们从两个方面进行了优化:

  控制 DM 表和 DWS 表的规模,这包括控制表的行数、列数、以及单字段大小;可有效减少数据表展用的资源。

  优化 SQL 写法。Cube 计算的核心原理是将数据扩展为 n 份(由 Cube 的数量决定),然后进行聚合操作。为了减少在扩展过程中产生的数据量,我们根据集群的规模和能力,将复杂的 SQL 查询拆分成多个较小的批次。通过分批次提交这些查询,巧妙地利用时间来换取所需的计算空间,从而避免了一次性处理大量数据导致的资源不足问题。  

  为了提升查询效率,数据仓库通常会在 APP 层创建多个 Cube,从一张宽表派生出多个针对不同业务场景的 Cube 表。这些 Cube 表虽然优化查询效率,但并不承担指标定义的功能。在不降低查询效率的前提下,StarRocks 提供了物化视图简化数据模型。物化视图本质是预先计算并存储在 StarRocks 中的数据,它对用户透明,在查询时自动将请求重定向到已计算好的数据集,从而减少了数据处理量并加快了查询速度。

  例如,如下图所示,未使用物化视图的查询(左侧)需要从基础底表中提取数据,而启用物化视图后(右侧),查询直接访问优化后的数据,物化视图的数据是底表数据关联聚合而来,可以显著减少数据量和提升查询速度。  

  对于离线数据的物化视图,一般为定时调度,其调度类似于天级离线任务,因此其调度不会对资源造成过多占用。

  在数据产品中,用户的查询往往遵循一定规则、灵活度受制于产品,这为物化视图提供了优化的机会。所有依赖同一张宽表的指标都可以通过物化视图得到加速,而无需在多个表中重复定义。这样,物化视图在后台静默地提高了查询效率。

  此外,StarRocks 通过 Colocation Join 功能进一步加速表的连接操作。该功能将一组具有相同分布的表分片组织成一个集合,并确保这些 Table 的分桶副本位于同一组节点上。在执行分桶列上的 Join 操作时,可以在本地节点上直接完成,减少数据在节点间的传输耗时。  

  5.1 案例背景

  业务运营团队的组织架构调整导致行业类目不定期变更,多个数据产品如 OneDash(公司/业务经营看板)、鹰眼(电商运营平台)、交易核心看板以及核心宽表都需要进行数据回刷。传统的 Spark 任务回刷成本高昂,迫切需要优化。

  5.2 链路改造  

  以交易核心看板和 OneDash 为例,原先的数据处理完全依赖于 Spark 引擎。出于性能考虑,商品行业的 Cube 表细分为两个版本:一个包含行业新老客户信息,另一个则不包含。然而,从业务需求出发,这两个版本的 Cube 表实际上可以合并为一张。鉴于 Cube 表计算的执行时间占比最大,可以将这一计算过程迁移至 StarRocks 平台,提升效率。  

  如上图所示,改造后的新链路经过优化,最终对外只开放两个 Cube 表:商品行业新老客 Cube 表、商家行业 Cube 表。

  商品行业新老客 Cube 表整合了老链路中的两个独立表——商品行业新老客 Cube 和商品行业 Cube。新表直接依赖于一张综合的商品行业用户渠道宽表,该宽表包含了商品行业和新老客户维度的关键信息以及多种指标。这一合并减少了维护的复杂性。

  商家行业 Cube 表的链路也类似,它依赖于商家行业用户渠道宽表,而这个宽表本身依赖于商品行业用户渠道宽表产出。

  这样设计的原因:1)保证商品行业 Cube 指标和商家行业 Cube 指标的一致性;2)StarRocks 中的关联操作可以使用 Colocate Join,效率比 Spark 要高。

  分不同维度计算用户数、商品数和商家数时,我们会先对 user_id、spu_id 和 seller_id 进行编码,然后在中间表中构建对应的 BitMap。

  5.3 回刷链路  

  面对行业变更,我们采取主备链路的策略来应对涉及多个数据产品的复杂回刷任务。主链路负责持续为线上产品提供实时服务,而备链路则专门用于执行数据的回刷操作。

  在行业发生变更时,业务数据仓库会根据最新的行业映射信息,重新构建备链路上的商品行业和商家行业维表。与此同时,主链路上的维表保持原行业映射不变,确保业务连续性。回刷过去两年的数据,包括商家行业维表、行业新老客维表,以及最新一天的商品行业维表。

  历史数据的回刷通过将商家行业和行业新老客维表的数据导入到 StarRocks 中来完成,而对于商品行业维表,只需回刷最新一天的数据。

  接着我们更新商品行业维表下游的维表依赖关系,使其指向最新日期的数据,并调度起各业务的 Cube 回刷链路,对近 2 年的数据进行全面更新。这一整个过程都是通过 StarRocks SQL 任务来实现的。数据调度平台则负责执行回刷计划,关键表会部署数据质量检测任务(DQC),保证回刷过程中的数据符合预期。

  一旦所有的 Cube 回刷任务完成,我们便可以调度同步任务,利用 StarRocks 的外表导入功能,将备集群的更新结果同步到主集群中。这样的同步操作确保了主链路数据的及时更新,同时也保障了数据的完整性和业务的连续运行。

  5.4 收益

  通过将回刷链路部署到 StarRocks 集群中,我们实现了资源的高效利用,无需申请其他额外资源。同时,主链路的运行依托于现有的线上集群,没有额外消耗。这次链路改造带来的主要收益可以分为两大类:

  回刷收益:以最近一轮的回刷为例,回刷 2022 年和 2023 年共计两年的数据。我们对比了基于 Spark 和基于 StarRocks 的链路性能。结果显示,StarRocks 链路在资源消耗和成本上都有显著的减少,回刷时间节省 90%,回刷成本降低 99%。具体来说,资源消耗从上千万 GBHour 降低到 几十万 GBHour,成本从上百万元大幅下降到几千元,回刷时间从一个月缩短到几天。

  日常收益:在日常数据处理方面,StarRocks 链路同样展现出色。与 Spark 链路相比,StarRocks 没有额外资源消耗,每天的数据产出时间提前了 1.5 小时以上,数据处理时间缩短至几分钟,这样的改进不仅加快数据处理速度,还提高整体的工作效率。  

  OLAP 引擎在实时数仓建设方面已经得到了广泛的应用。我们的实践证明,结合业务特点,在处理中小规模数据量时,使用 StarRocks 等分布式 OLAP 引擎替换 Spark ,承担更多的离线处理任务,可以显著提高数据仓库产出的速度和效率,达到降本增效的目的。

  展望未来,我们计划进一步探索 StarRocks 在湖仓一体和存算分离的应用场景,以构建更加高效、灵活的数据生产链路和自助分析产品。我们期待通过这些创新实践,能够为公司带来更强大的数据处理能力,支持业务的持续增长和决策的精准性。

0
相关文章