写点什么

Trino 通过 Iceberg 创建表的过程分析

  • 2021 年 12 月 12 日
  • 本文字数:4066 字

    阅读完需:约 13 分钟

Iceberg 表

Iceberg 是一个为大规模数据集设计的通用的表格形式,并且适配了 Trino 、Flink 和 Spark,提供 SQL 化解决方案。在构建数据湖的场景中,计算引擎和 iceberg 的组合为用户提供了 ACID 和多版本支持,支持批/流读写等特性。本文主要介绍 trino 如何构建一个 Iceberg 表。

过程分析

trino 在使用 iceberg catalog 的时候,需要依赖 hive metastore。通过 iceberg connector 创建的表元信息都会同步到 metastore 服务中。

Trino 创建 Iceberg 表流程:



事务准备阶段:

相关代码 IcebergMetadata.beginCreateTable()。

  1. 转换 trino 字段类型为 iceberg 类型,生成 iceberg 表的 schema。

  2. 解析分区字段,trino 的分区字段转换成对应的 iceberg 分区类型。

  3. 通过元数据系统(hms)拿到对应的 database location,创建表的 location。

  4. 创建 operation,trino 自定义了 HiveTableOperation。

  5. 生成文件提交的事务对象。

事务执行阶段:

  1. 事务提交的核心代码在 operation 的 commit 方法内,先根据表的元数据

  2. 生成 iceberg 初始元数据文件。

  3. 封装一个表对象准备做持久化。

  4. 生成文件的过程中是否有异常,有异常就回滚已写的元数据文件。

  5. 提交给 hive metastore 系统持久化表。

在事务提交阶段,能够保证整个过程是事务可回滚的,创建表成功后,文件系统上和 metastore 上都会有对应的表记录,任何一个过程失败都会回滚。


Note:整个过程没有采用 Iceberg 的 HadoopCatalog 和 HiveCatalog,使用了 Iceberg 更底层的接口,独自实现了类似 iceberg 中 hiveCatalog 创建表的过程。


代码流程:


引用相关 Iceberg 接口和方法:

BaseTransaction.commitCreateTransaction()执行 tableOperation 事务

TableMetadataParser.write()写入元数据 json 文件


Iceberg 创建表的 Java 实现

Iceberg 提供了创建表的 Java API,有两种方式创建表,分别采用 hadoop catalog 和 hive catalog 实现。hadoop catalog 主要是在 hadoop 作为存储的场景下,不依赖 hms 创建表。hive catalog 需要依赖 hms,常用在 S3 作为存储的场景。

hadoop catalog 创建表的代码实现:

import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;import java.util.function.Consumer;import java.util.regex.MatchResult;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.hadoop.HadoopCatalog;import org.apache.iceberg.types.Type;import org.apache.iceberg.types.TypeUtil;import org.apache.iceberg.types.Types;import org.apache.hadoop.conf.Configuration;import org.junit.Test;
import static java.lang.Integer.parseInt;import static org.apache.iceberg.types.Types.NestedField.optional;import static org.apache.iceberg.types.Types.NestedField.required;
public class CreateTableTest {
private static final String NAME = "[a-z_][a-z0-9_]*"; private static final String FUNCTION_ARGUMENT_NAME = "\\((" + NAME + ")\\)"; private static final String FUNCTION_ARGUMENT_NAME_AND_INT = "\\((" + NAME + "), *(\\d+)\\)";
private static final Pattern IDENTITY_PATTERN = Pattern.compile(NAME); private static final Pattern YEAR_PATTERN = Pattern.compile("year" + FUNCTION_ARGUMENT_NAME); private static final Pattern MONTH_PATTERN = Pattern.compile("month" + FUNCTION_ARGUMENT_NAME); private static final Pattern DAY_PATTERN = Pattern.compile("day" + FUNCTION_ARGUMENT_NAME); private static final Pattern HOUR_PATTERN = Pattern.compile("hour" + FUNCTION_ARGUMENT_NAME); private static final Pattern BUCKET_PATTERN = Pattern.compile("bucket" + FUNCTION_ARGUMENT_NAME_AND_INT); private static final Pattern TRUNCATE_PATTERN = Pattern.compile("truncate" + FUNCTION_ARGUMENT_NAME_AND_INT);
// private static final Pattern ICEBERG_BUCKET_PATTERN = Pattern.compile("bucket\\[(\\d+)]");// private static final Pattern ICEBERG_TRUNCATE_PATTERN = Pattern.compile("truncate\\[(\\d+)]");
private static final String CATALOG_NAME = "icebergCatalog"; private static final String DBNAME = "icebergDB"; private static final String TABLE_NAME = "orders"; private static final String WAREHOUSE = "hdfs://10.0.30.10:9000/user/hive/warehouse"; private static Schema SCHEMA; private static PartitionSpec SPEC; private static Map<String, String> properties = new HashMap<>(); private static Configuration conf; private static HadoopCatalog catalog;
public CreateTableTest(){ conf = new Configuration(); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); catalog = new HadoopCatalog(conf, WAREHOUSE); properties.put("format-version", "2"); }
@Test public void createTable() { prepareCreateTable(); TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME); Transaction txn = catalog.buildTable(tableIdent, SCHEMA) .withPartitionSpec(SPEC) .withProperties(properties) .createTransaction(); txn.commitTransaction(); }
@Test public void showTable(){ TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME); Table table = catalog.loadTable(tableIdent); System.out.println(table.toString()); }
public Table getTable(TableIdentifier identifier){ return catalog.loadTable(identifier); }
@Test public void dropTable(){ TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME); catalog.dropTable(tableIdent, true); }
public Table getTable(String dbName, String tableName){ TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(dbName), tableName); return catalog.loadTable(tableIdent); }
public static void prepareCreateTable(){
SCHEMA = buildSchema();
List<String> partitionFields = new ArrayList<>(); partitionFields.add("order_date"); SPEC = parsePartitionFields(SCHEMA, partitionFields);
conf = new Configuration(); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); }
public static Schema buildSchema() { Types.StructType structType = Types.StructType.of( required(0, "id", Types.LongType.get(), "order_id"), required(1, "order_date", Types.DateType.get()), required(2, "account_number", Types.LongType.get()), required(3, "customer", Types.StringType.get()), optional(4, "country", Types.StringType.get(), "customer country") ); AtomicInteger nextFieldId = new AtomicInteger(1); Type icebergSchema = TypeUtil.assignFreshIds(structType, nextFieldId::getAndIncrement); return new Schema(icebergSchema.asStructType().fields()); }
public static PartitionSpec parsePartitionFields(Schema schema, List<String> fields) { PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); for (String field : fields) { parsePartitionField(builder, field); } return builder.build(); }
public static void parsePartitionField(PartitionSpec.Builder builder, String field) { @SuppressWarnings("PointlessBooleanExpression") boolean matched = false || tryMatch(field, IDENTITY_PATTERN, match -> builder.identity(match.group())) || tryMatch(field, YEAR_PATTERN, match -> builder.year(match.group(1))) || tryMatch(field, MONTH_PATTERN, match -> builder.month(match.group(1))) || tryMatch(field, DAY_PATTERN, match -> builder.day(match.group(1))) || tryMatch(field, HOUR_PATTERN, match -> builder.hour(match.group(1))) || tryMatch(field, BUCKET_PATTERN, match -> builder.bucket(match.group(1), parseInt(match.group(2)))) || tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(2)))); if (!matched) { throw new IllegalArgumentException("Invalid partition field declaration: " + field); } }
private static boolean tryMatch(CharSequence value, Pattern pattern, Consumer<MatchResult> match) { Matcher matcher = pattern.matcher(value); if (matcher.matches()) { match.accept(matcher.toMatchResult()); return true; } return false; }}
复制代码


发布于: 3 小时前阅读数: 4
用户头像

还未添加个人签名 2018.10.30 加入

还未添加个人简介

评论

发布
暂无评论
Trino通过Iceberg创建表的过程分析