Apache Flink是目前大数据领域最流行的流批一体化计算引擎,而数据湖技术也是互联网时代的产物,以Iceberg、Hudi和Delta为代表的数据湖技术应运而生。Iceberg目前已经提供对Apache Flink 1.11.x的集成支持,Flink可以通过DataStream API/Table API将数据写入Iceberg。
新浪和微博有强大的用户群体,目前积累的数据已经达到几百PB。微博的技术通常会采集应用App的埋点数据以及应用服务日志之类的数据,这些数据通过Kafka消息中间件接入数据仓库。
在微博的数据仓库中,有多种大数据存储组件,譬如Hive/HBase/HDFS,计算引擎有MapReduce、Spark、Flink,根据用户不同的需求,会应用不同的技术在大数据平台中计算,将结果保存到MySQL、ES等支持快速查询的关系型、非关系型数据库中,接下来应用层就可以基于这些数据进行BI报表开发、用户画像,或者基于Presto这种OLAP工具进行统计查询。
在整个数据处理的过程中,我们会借用调用系统来调度应用程序,定期(T+1或者H+1)去执行一些Spark任务。离线数据处理的整个过程中存在着大量的数据延迟现象,这些数据可能是T+1输出或者是H+1输出。但是,业务方已经不再满足于这些离线处理数据的方式,因此,我们也用Flink+Kafka去构建了实时流数据处理系统。
如下图,就是原来使用的Lambda架构,Lambda架构将数仓分成离线层和实时层。也就是说,同一份数据会被处理两次以上,同一套业务逻辑需要适配两次开发。
例如在实时场景下计算PV、UV时,我们会用实时技术计算,这些数据指标会离开呈现出来。但是,我们有时候需要做趋势分析,需要每天再重新计算一次PV、UV数据,比如在凌晨3点的时候,用Spark在调度系统上把前一天的数据全部重新再跑一遍。
很显然,这两个过程运行的时间不一致,跑的数据却是完全相同的。重新跑一遍离线分析的数据,数据的更新成本很高,更严重的是二者的数据可能不一致(比如有延迟数据产生,离线数据比实时更准确)。
为了解决Lambda架构的痛点,就产生了Kappa架构,相信大家对这个架构也非常熟悉。
Kappa架构解决了Lambda架构中离线和实时数据间不一致、运营和开发成本加班的问题,但是Kappa架构也有痛点。
首先,我们需要借用Kafka来构建实时场景,但是如果需要对ODS层数据做进一步的分析时,就要接入Flink计算引擎把数据写入到DWD层的Kafka,同样也会将一部分结果数据写入到DWS层的Kafka。但是,如果想做简单的数据分析时,又要将DWD和DWS层的数据写入到ClickHouse、ES、MySQL或者是Hive里做进一步分析,这无疑带来了链路的复杂性。
其次,Kappa架构是严重依赖于消息队列的,我们知道消息队列本身的准确性严格依赖它上游数据的顺序,但是,消息队列越多,发生乱序的可能性越大。通常情况下,ODS层的数据是绝对准确的,把ODS层数据经过计算之后写入到DWD层时就会产生乱序,DWD到DWS更容易产生乱序,这样的数据不一致性问题非常大。
那么有没有一种架构,既能满足实时性的需求,又能满足离线计算的需求,同时还能减轻运营开发成本?解决Kappa架构的痛点呢?
是否有一种技术,既能够保证数据高效的回溯能力,支持数据更新,又能够实现数据的流批读写,并且还能够实现分钟级别的数据接入。
这也是建设实时数据仓库的迫切需要,实际上需要对Kappa架构进行改进升级,以解决Kappa架构中遇到的问题,接下来我们会进一步探讨数据湖技术--Iceberg。
官网对Iceberg的描述如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Trino and Spark that use a high-performance format that works just like a SQL table.
Iceberg的官方定义是一种表格式,可以理解为是基于计算层(Spark、Flink)和存储层(ORC、Parquet、Avro)的中间介质层,用Flink或者Spark将数据写入Iceberg,然后通过Presto、Flink、Spark来读取这些表。
Iceberg主要是为分析海量数据计算的,被定义为Table Format,Table Format介于计算层和存储层之间。
Table Format向下管理存储系统上的文件,向上为计算层提供接口。比如一张Hive表,在HDFS上会有Partition,存储格式,压缩格式和数据的HDFS目录等,这些信息都维护在元数据中,这些元数据被称为一种文件的组织形式。
Iceberg能够高效支撑上层的计算层访问磁盘上的文件。
Iceberg目前支持三种文件格式,Parquet、ORC、Avro,Iceberg的主要功能如下:
自带ACID能力:保障每次写入后的数据都是一个完整的快照(snapshot),每个snapshot包含着一系列的文件列表,落地任务把数据直接写入Iceberg表中,不需要任务再做额外的success状态维护。Iceberg会根据分区字段自动处理延时到来的数据,把延时的数据及时的写入到正确的分区,因为有ACID的保障,延时数据写入过程中Iceberg表依然提供可靠的读取能力。
基于MVCC(Multi Version Concurrency Control)的机制,默认读取文件会从最新的的版本,每次写入都会产生一个新的snapshot,读写相互不干扰。
基于多版本的机制可以可用轻松实现回滚和时间旅行的功能,读取或者回滚任意版本的snapshot数据。
下图是 Iceberg 整个文件的组织架构。从上往下看:
如下图所示,虚线框(snapshot-1)表示正在进行写操作,但是还没有发生commit操作,这时候 snapshot-1 是不可读的,用户只能读取已经 commit 之后的 snapshot。同理, snapshot-2,snapshot-3表示已经可读。
可以支持并发读,例如可以同时读取S1、S2、S3的快照数据,同时,可以回溯到snapshot-2或者snapshot-3。在snapshot-4 commit完成之后,这时候snapshot-4已经变成实线,就可以读取数据了。
例如,现在current Snapshot 的指针移到S3,用户对一张表的读操作,都是读 current Snapshot 指针所指向的 Snapshot,但不会影响前面的 snapshot 的读操作。
Iceberg的每个snapshot都包含前一个snapshot的所有数据,每次都相当于全量读取数据,对于整个链路来说,读取数据的代价是非常高的。
如果我们只想读取当前时刻的增量数据,就可以根据Iceberg中Snapshot的回溯机制来实现,仅读取Snapshot1到Snapshot2的增量数据,也就是下图中的紫色数据部分。
同理,S3也可以只读取红色部分的增量数据,也可以读取S1-S3的增量数据。
Iceberg支持读写分离,也就是说可以支持并发读和增量读。
目前Flink社区现在已经重构了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我们采用的也是社区提供的FlinkIcebergSink,曲线框中的这块内容是 FlinkIcebergSink。
多个 IcebergStreamWriter 和一个 IcebergFileCommitter 的情况下,在上游的数据写到 IcebergStreamWriter 的时候,每个 writer 里面做的事情都是去写 datafiles 文件。
当每个 writer 写完自己当前这一批 datafiles 小文件的时候,就会发送消息给 IcebergFileCommitter,告诉它可以提交了。而 IcebergFileCommitter 收到信息的时,就一次性将 datafiles 的文件提交,进行一次 commit 操作。
commit 操作本身只是对一些原始信息的修改,让其从不可见变成可见。
在实际的生产环境中,Flink 实时作业会一直在集群中运行,为了要保证数据的时效性,一般会把 Iceberg commit 操作的时间周期设成 30 秒或者是一分钟。当 Flink 作业跑一天时,如果是一分钟一次 commit,一天需要 1440 个 commit,如果 Flink 作业跑一个月commit 操作会更多。甚至 snapshot commit 的时间间隔越短,生成的 snapshot 的数量会越多。当流式作业运行后,就会生成大量的小文件。
Iceberg 小文件合并是在
org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。社区中小文件合并其实是通过 Spark 并行计算的,我们参考了社区Spark的实现方法,自己封装了使用Flink合并小文件的方法。
我们知道Iceberg支持读写分离,又支持并发读、增量读、合并小文件,而且还能做到秒级/分钟级的数据延迟。我们基于Iceberg这些优势,采用Flink+Iceberg的方式构建了流批一体化的实时数据仓库。
在数据仓库处理层,可以用 presto 进行一些简单的查询,因为 Iceberg 支持 Streaming read,所以在系统的中间层也可以直接接入 Flink,直接在中间层用 Flink 做一些批处理或者流式计算的任务,把中间结果做进一步计算后输出到下游。
希望借助Alluxio构建一个数据湖加速功能,以便在查询层实现秒级分析功能。
建立自动Schema建表的功能。
和所有业务系统打通,年内迁移完成所有业务线的数据,完整全部数据入湖建设。
原创不易,欢迎点赞加关注,您的关注是我持续创作的动力。