网上关于 Hudi 和 Iceberg 对比的内容有很多,比如 Iceberg 对 Schema 友好,Hudi 支持 Upsert 等优劣点的对比,这些内容很大程度上已经过时,在未来的几个月内,我们就能看到大部分关键功能在两个框架上的打平,所以非常有必要相对全面地了解两个框架的背景、设计思想、功能细节等。
本文针对 Hudi 的机制做了相对较全面的梳理,由于具体内容不涉及到源码的具体细节,很多并不熟悉数据湖的同学也可以在这里了解到其技术上的全貌和亮点。
先说背景,Hudi(Hadoop Upsert anD Incremental),从 Uber 内部孵化出来的开源项目,最初是用于解决数仓中 Lambda 架构中数据一致性的问题,将增量处理模型替代流式处理模型,并提供了 Upsert 和 Incremental Pull 两个非常重要的 feature。
这里可以提一下,Hudi 内部主打的一个场景,就是乘客打车下单和司机接单的匹配,乘客和司机分别是两条数据流,通过 Hudi 的 Upsert 能力和增量读取功能,可以分钟级地讲这两条数据流进行拼接,得到乘客-司机的匹配数据。
(还记得两年前 Hudi 对自身的定位是一个 Pipeline 或者存储框架,现在官网的描述已经变成了「下一代的流式数据湖平台」,继商业化之后,想象空间也上升了好几个 Level。)
Timeline 可以理解为 Hudi 表的一个时间线,记录了 Hudi 表在不同时刻的信息和行为,这个 Timeline 由 TimelineServer 来管理,通常存在于 Hdfs、RDBMS 等持久化存储介质中。实际上,Hudi 将 Timeline 信息放到每个 Table 内的 .hoodie 目录中,并通过文件名来进行不同 instant 的区分。通过 Timeline 可以方便地做版本管理以及实现增量处理等和版本/时间相关的功能。
Timeline 涉及到 3 个关键概念:
可以看到,所有需要更改表元信息的操作,都是需要将对应的 action 提交至 Timeline,而 Timeline 的操作要保证原子性,一般要由单点进行操作,比如 Hudi 在与 Spark/Flink 结合时,利用 Spark 的 Driver 和 Flink 的 JobMaster 来进行 Timeline 的信息记录。
Hudi 提供了两种表类型,分别为 Copy-on-Write 和 Merge-on-Read,其对应的查询类型如下:
对于 Copy-On-Write Table,用户的 update 会重写数据所在的文件,所以是一个写放大很高,但是读放大为 0,适合写少读多的场景。对于这种 Table,提供了两种查询:
具体的流程见下图 gif:
对于 Merge-On-Read Table,整体的结构有点像 LSM-Tree,用户的写入先写入到 delta data 中,这部分数据使用行存,这部分 delta data 可以手动 merge 到存量文件中,整理为 parquet 的列存结构。对于这类 Table,提供了三种查询:
具体的流程见下图 gif:
MOR 表中可能存在两种文件,在 Hudi 内部被称为 base file 和 log file,其中 base file 通常为 parquet 文件,列存格式,对读取友好,log file 通常为 avro 格式,行存,对写入友好。
Hudi 表有主键的概念,所以 Index 的出现也非常合理,可以用于定位数据的位置以提供更高效的写入和读取操作,不同的 Index 类型提供了不同的粒度:
对于每条 Record,我们会查询/计算 Record 的主键所在索引的方式,来判断是 Insert 还是 Update,以及对应的旧文件的位置。在实时写入的过程中,Index 的查询是最关键的部分之一,索引设计的高效与否直接决定了数据写入的性能和稳定性。(可以之后专门出一篇文章来写这个内容)
示意图如下所示,由外到里分别是:
上面提到,Upsert 的操作和 Index 类型很相关,但是在 Hudi 内部有趣的是,由于初始架构设计的缺陷(并未考虑非 Spark 场景),导致了不同 Connector 在使用 Index 上有非常大的差异。
Hudi 目前支持 Spark 和 Flink,而我们这里也以这两种计算引擎为例,讲解一下 Upsert 具体的实现机制:
Bulk Insert 的操作比较简单,只用于某个 Partition 或者某个 Table 初次初始化时使用,由于没有 Update 操作,所以只需要考虑 Insert 情况,性能相比 Upsert 有非常大的提升。
这里可以看一下 Hudi 官方的 Incremental Query 的示例
// spark-shell
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
<< · Back Index ·>>