发布时间:2025-03-07 17:53:47 点击量:
HASH GAME - Online Skill Game GET 300
这是一个典型的流式湖仓架构,首先业务数据会存储在 MySQL 表中,然后借助 Flink 及其 CDC Connector 的作业,将这些数据库的数据同步到 Paimon 的 ODS 层中,从而构成 ODS 层数据。这层数据实际上包含了 MySQL 的全量数据,并且会根据 MySQL 的更新实时地进行相应的更新。在有了 ODS 层数据之后通常会进行数据过滤,并进行数据的 Join 操作,以生成一个宽表,这就是 DWD 层的数据。DWD 层数据形成后会进一步进行数据过滤、数据的聚合和打宽表等操作,以生成 DWS 层的数据,用于进行指标的统计。这就是一个简化的流式湖仓分层设计。在这个分层设计中, Apache Flink 与 Paimon 是两个至关重要的组件。
但幸运的是 Flink 提供了一个非常有效的机制来实现宽表构建,即 Partial Update 。如图所示它可以使用两个 Flink 分别对一个表中的不同字段进行更新。例如左边的表只需要对 Column A 进行更新,并将 Column B 设置为 null ,而另一个 Sink 则可以对 Column B 进行更新。这样当下游读取这个表时,它会自动将这两个部分合并起来,将 null 值替换为对应的实际值。
在实际使用过程中经常会遇到一个问题:当一个作业包含多个Source ,并将数据写入同一个 Paimon 表时,如果多个 Flink 尝试同时对该表进行 Compaction 操作,Paimon 通常不支持这种行为。这会导致作业在执行 Compaction 时失败,进而引发作业持续 Failover ,最终导致作业不可用。为了解决这个问题,用户可以通过配置来关闭作业的自动 Compaction 功能。然而,这样做意味着需要启动另一个专门的作业来对该表执行 Compaction 操作。在很多情况下,用户可能只希望通过一个作业的多个 Flink 来完成必要的 Passbook 操作,而不希望额外启动一个专门的作业来进行 Compaction 。但遗憾的是当前可能无法直接让该作业自行处理 Compaction 。
为了实现这一目标需要在 Flink 的 Single Planner 中进行一些改造。改造完成后, Flink Single Planner 将能够自动识别是否多个 Flink 组件正在向同一张表写入数据。在满足特定条件时将多个 Flink 的上游结果进行 Union 操作,并仅使用一个 Flink 组件来统一写入所有数据。这样在进行 Compaction 操作时,就只有一个判断逻辑,从而避免了之前提到的冲突问题。
由于 Paimon 通常是 bucket 存储的,在主键表的情况下,分桶是一种常见的做法。然而为什么事实表不能根据Paimon表的分桶分配方式进行 shuffle 呢?原因是目前还没有办法让 Paimon 表指定事实表的 shuffle 方式。因此在 Flink Single Planner 中引入了一个名为“Support Lookup Custom Shuffle”的接口。这个接口的本质是允许 connector 为维表实现指定事实表的数据 shuffle 方式。有了这个接口后 Paimon 表的维表就能够执行一些特定的操作了。
首先关于 Paimon 的维表, Paimon 的主键表中包含两种分Bucket 的方式。最简单的一种是 Fixed Bucket 。Fixed Bucket 指的是在作业定义时,而非在 Paimon 表定义时,就已经确定了 Bucket 的数量。对于任意一条数据,其对应的 Bucket 可以通过一个简单公式计算得出。本质上这个计算过程是对 Bucket Key 取哈希值,然后再对 Bucket 的总数取模,从而确定数据具体属于哪个 Bucket 。实际上只需要让事实表也按照同样的方式进行 shuffle 。例如在事实表中可以将 K 1 和 K 2 分配到 Lookup 算子上。这个 Lookup 算子知道,它只需要读取 Bucket 1 的数据,并且只需将 Bucket 1 的数据存储在本地即可。通过这种方式,可以大大降低每个 Lookup 算子的并发量,减少其需要读取的 Paimon 数据量。同时也能降低其实际要存储到本地以及内存中的数据量。
但是依然可以做一些事情来应对这种情况,即可以通 Custom Shuffle 接口来指定其 Sort of 的方式。这里的分配方式是指根据 Join Key 取一个哈希值,然后在取模时根据下游 Subtask 的数量,即 Lookup Join 的 Subtask 数量来进行。这时每个 Lookup 算子,或者说每个 Lookup 的并发实例,在读取维表时就会知道它可能会接收到哪些与事实表相关的数据。因此它就可以对其存储的缓存进行一些裁剪。
比如说,虽然事实表仍然是按照如 K 1 、 K 3 这样的 Key 发送给上游的并发实例,但这些并发实例在读取数据时还是需要全量读取。但是当数据存储到本地时可以进行过滤,只存储与 K 1 和 K 3 相关的数据。因为他知道事实表的分配算法策略是他指定的,所以他可以只存储与 K 1 、 K 3 相关的维表数据。尽管在读取时仍然需要访问全量的数据,但实际上他只需要在本地保存一部分维表数据。这就是针对 Lookup Join 所做的一个优化策略。
比如说在这个例子中,设定 N 等于 2 ,事实表中的 K 1 可能会被分发到第一个和第二个并发实例上,从而尽可能地将一个 Hot Key 打散。由于需要进行一个类似于复制的操作,因此第一个和第二个 Lookup 并发算子都需要额外读取一个 Bucket 。这实际上是一个 Trade-off 。如果将 N 设置得很大,那么数据被打散得会更加平均,但每个算子需要读取的数据量也会相应增加。以上就是关于数据读写以及查询优化方面的一些讲解。
具体来说在 1.18 及 Paimon 0.9 之前的版本中,要求在执行 COPY 或类似操作时,必须按照定义的顺序填入所有参数,即使某些参数有默认值且用户并不关心,也必须用空字符串来替代。例如在进行 Compact 操作时,如果希望对名为 Default 的表进行 Compact ,并将并发度设置为 4 ,理论上只需要传递两个参数。但在实际操作中,在参数列表中需要填入三个空字符串,这无疑增加了用户的操作难度,降低了使用的便捷性。
在 Flink 1.18 之后引入了一个名为 Named Argument 的功能。这一功能允许以任意顺序传入参数,并且只需填写必要的参数即可。在相同的场景下只需要填写表名和并发度配置即可。此外之前提到 Paimon 提供了许多 Action ,包括进行 Compaction 、 Snapshot 管理,以及 Clone Table 或 Clone Database 等操作,然而在之前的版本中发现许多 Action 并没有对应的 Flink Procedure 实现。这导致在使用时,需要通过 Action 的流程来完成操作。
Action 的流程通常是怎样的呢?以创建一个表的操作为例。在 0.9版本之前,当 Paimon 还没有实现对应的 Procedure 来创建空表时,用户首先需要从 Paimon 官网上下载一个 Action 的 JAR 包。然后,用户需要将这个 JAR 包上传到 Flink 的运行环境上。接下来,用户需要通过执行 Flink Run 命令来启动创建空表的作业。同时,作业的参数也需要通过命令行参数来传入。这样的操作方式显然不够便捷。
接下来展望一下未来的技术发展方向。首先关于前面提到的 Range Partition ,即无主键表,它已经实现了对用户指定列的排序功能,这一功能已经在 Paimon 0.9 版本中发布。此外刚刚提到的 Procedure 易用性优化也已经在该版本中得以呈现。现在大家应该都能开始使用这些功能了。值得一提的是 Paimon 是首个在第一时间对接了 Flink 2.0 的 Materialized Table 功能的平台,这一功能将在 Paimon 1.0 版本上线时与大家见面。