
本文主要介绍如何快速的通过Spark访问Iceberg table。
Spark通过DataSource和DataFrame API访问Iceberg table,或者进行Catalog相关的操作。由于Spark Data Source V2 API还在持续的演进和修改中,所以Iceberg在不同的Spark版本中的使用方式有所不同。
| 功能 | Spark 2.4 | Spark 3.0 |
|---|---|---|
| 基于DataFrame | ||
| - 读数据 | 支持 | 支持 |
| - 读元数据 | 支持 | 支持 |
| - 追加(append) | 支持 | 支持 |
| - 覆盖(Overwrite) | 支持 | 支持 |
| - V2 source专属操作,如create, overwrite | 不支持 | 支持 |
| 基于Spark SQL | ||
| - SELECT | 通过DataFrame的temporary view | 支持 |
| - DDL | 不支持(仅能通过Iceberg API) | 支持(通过Catalog) |
| - DML | 不支持 | 支持 |
Iceberg内部支持Hive和Hadoop两种catalog:
| Catalog类型 | Metadata JSON管理 | Namespace |
|---|---|---|
| Hive catalog | Hive MetaStore | 1级,即DB |
| Hadoop catalog | 文件系统上的某个文件 | 多级,对应多级目录 |
后文以Hive catalog为主做介绍。Hive catalog需要Hive MetaStore的支持。注意其有多种配置方式,其中内嵌的Derby数据库仅仅用于实验和学习,不能用于生产环境。
<SPARK_HOME>/f需要加入如下配置,使Iceberg能够访问Hive MetaStore:
spark.astore.uris thrift://<HiveMetaStore>:9083spark.astore.warehouse.dir hdfs://<NameNode>:8020/path
如何使用社区正式发布的版本:
spark-shell --packages org.apache.iceberg:iceberg-spark-runtime:0.7.0-incubating
如何本地打包,并把Iceberg放入Spark的classpath:
git clone .gitcd incubator-iceberg# master branch supports Spark 2.4.4./gradlew assemblespark-shell --jars <iceberg-git-working-directory>/spark-runtime/build/libs/iceberg-spark-runtime-<version>.jar
Spark 2.4只能读写已经存在的Iceberg table。在后续的操作前,需要先通过Iceberg API来创建table。
读取是通过DataFrameReader并指定iceberg作为format来访问Iceberg table,随后Iceberg内部的逻辑会根据path来判断访问的是Hive catalog下的table,还是用文件系统的路径表示的Hadoop table。
// Table managed by Hive catalog.format("iceberg").load("db.table")// Hadoop table, identified by a path.format("iceberg").load("hdfs://<NameNode>:8020/<path_to_table>")
Iceberg会判断path中是否含有"/"。如果是,则认为是一个用路径表示Hadoop table;否则,会去Hive catalog中寻找。
在读取时,通过option指定as-of-timestamp或者snapshot-id来访问之前某一个snapshot中的数据:
// Time travel to October 26, 1986 at 01:21:00.format("iceberg").option("as-of-timestamp", "499162860000").load("db.table")// Time travel to snapshot with ID 10963874102873L.format("iceberg").option("snapshot-id", 10963874102873L).load("db.table")
snapshot-id的获取方法,可以参考后文中访问元数据中snapshot的部分,或者直接查看元数据文件的内容。
在DataFrame的基础上,创建local temporary view后,也可以通过SQL SELECT来读取Iceberg table的内容:
val df = ad.format("iceberg").load("db.table")spark.sql("""SELECT * FROM view""").show()
Spark 2.4可以通过DataFrameWriter并指定iceberg作为format来写入Iceberg table,并支持append和overwrite两种模式:
// Appenddf.write.format("iceberg").mode("append").save("db.table")// Overwritedf.write.format("iceberg").mode("overwrite").save("db.table")
有如下几点需要注意:
Overwrit的行为dynamic overwrite,即当某个partition中含有输入DataFrame中的行的时候,该partition才会被新数据完全覆盖;其他partition则保持不变。而Spark 2.4中原生数据源(如parquet)的默认行为是static overwrite;
操作粒度是文件级别,并不是行级别;
mode必须显式指定,没有默认行为。
Iceberg支持通过DataFrameReader访问table的元数据,如snapshot,manifest等。对于Hive table,可以在原table name后面加.history、.snapshots等表示要访问元数据;对于用路径来表示的Hadoop table,需要在原路径后面加#history等。例如:
// Read snapshot history of db.table.format("iceberg").load("db.table.history")
结果如下:
+-------------------------+---------------------+---------------------+---------------------+| made_current_at | snapshot_id | parent_id | is_current_ancestor |+-------------------------+---------------------+---------------------+---------------------+| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true || 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true || 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false || 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true || 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true || 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |+-------------------------+---------------------+---------------------+---------------------+
又如:
// Read snapshot list of db.table.format("iceberg").load("db.table.snapshots")// Read manifest files of db.table.format("iceberg").load("db.table.manifests")// Read data file list of db.tabe.format("iceberg").load("db.table.files")
可以进一步将history和snapshot按照snapshot id做join,来查找snapshot id对应的application id:
.format("iceberg").load("db.table.history").createOrReplaceTempView("history").format("iceberg").load("db.table.snapshots").createOrReplaceTempView("snapshots")
SELECTh.made_current_at,s.operation,h.snapshot_id,h.is_current_ancestor,s.summary['spark.app.id']FROM history hJOIN snapshots sON h.snapshot_id = s.snapshot_idORDER BY made_current_at
结果如下:
-------------------------+-----------+----------------+---------------------+----------------------------------+| made_current_at | operation | snapshot_id | is_current_ancestor | summary[spark.app.id] |+-------------------------+-----------+----------------+---------------------+----------------------------------+| 2019-02-08 03:29:51.215 | append | 57897183625154 | true | application_1520379288616_155055 || 2019-02-09 16:24:30.13 | delete | 29641004024753 | false | application_1520379288616_151109 || 2019-02-09 16:32:47.336 | append | 57897183625154 | true | application_1520379288616_155055 || 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true | application_1520379288616_152431 |+-------------------------+-----------+----------------+---------------------+----------------------------------+
Iceberg在Spark 3.0中,作为V2 Data Source,除了上述Spark 2.4所有的访问能力外,还可以通过V2 Data Source专属的DataFrame API访问;同时,受益于external catalog的支持,Spark SQL的DDL功能也可以操作Iceberg table,并且DML语句支持也更加丰富。
在<SPARK_HOME>/f加入如下配置:
spark.sql.catalog.catalog-nameample.YourCatalogClass
df.writeTo("catalog-name.db.table").overwritePartitions()
相较于Spark 2.4,Spark 3.0可以省去DataFrameReader和创建local temporary view的步骤,直接通过Spark SQL进行操作:
-- Create tableCREATE TABLE catalog-name.db.tabe(id INT, data STRING)USING icebergPARTITIONED BY (id)-- InsertINSERT INTO catalog-name.db.tableVALUES (1, 'a'), (2, 'b'), (3, 'c')-- DeleteDELETE FROM catalog-name.db.tableWHERE id <> 1-- UpdateUPDATE catalog-name.db.tableSET data = 'C' WHERE id = 3-- Create table as selectCREATE TABLE catalog-name.db.tableUSING icebergAS SELECT id, dataFROM catalog-name.db.table1WHERE id <= 2
我们作为社区中spark-3分支的维护者,正在持续推进新功能的开发和合入,让更多的人受益。
本文作为Iceberg的快速入门,介绍了如何通过Spark访问Iceberg table,以及不同Spark版本的支持情况:
Spark 2.4可以通过DataFrame读取或修改已经存在的Iceberg table中的数据,但建表、删表等DDL操作只能通过Iceberg API完成;
Spark 3.0访问Iceberg table的能力是Spark 2.4的超集,可以通过Spark SQL配合catalog,进行SELECT、DDL和DML等更多的操作。
随着Iceberg自身功能的完善(如向量化读取,merge on read等),以及上下游对接和生态的丰富,Iceberg作为优秀的表格式抽象,在大数据领域必然会有更好的发展。
本文发布于:2024-01-30 16:59:09,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170660515021501.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
| 留言与评论(共有 0 条评论) |