import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;import java.util.function.Consumer;import java.util.regex.MatchResult;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.hadoop.HadoopCatalog;import org.apache.iceberg.types.Type;import org.apache.iceberg.types.TypeUtil;import org.apache.iceberg.types.Types;import org.apache.hadoop.conf.Configuration;import org.junit.Test;
import static java.lang.Integer.parseInt;import static org.apache.iceberg.types.Types.NestedField.optional;import static org.apache.iceberg.types.Types.NestedField.required;
public class CreateTableTest {
private static final String NAME = "[a-z_][a-z0-9_]*"; private static final String FUNCTION_ARGUMENT_NAME = "\\((" + NAME + ")\\)"; private static final String FUNCTION_ARGUMENT_NAME_AND_INT = "\\((" + NAME + "), *(\\d+)\\)";
private static final Pattern IDENTITY_PATTERN = Pattern.compile(NAME); private static final Pattern YEAR_PATTERN = Pattern.compile("year" + FUNCTION_ARGUMENT_NAME); private static final Pattern MONTH_PATTERN = Pattern.compile("month" + FUNCTION_ARGUMENT_NAME); private static final Pattern DAY_PATTERN = Pattern.compile("day" + FUNCTION_ARGUMENT_NAME); private static final Pattern HOUR_PATTERN = Pattern.compile("hour" + FUNCTION_ARGUMENT_NAME); private static final Pattern BUCKET_PATTERN = Pattern.compile("bucket" + FUNCTION_ARGUMENT_NAME_AND_INT); private static final Pattern TRUNCATE_PATTERN = Pattern.compile("truncate" + FUNCTION_ARGUMENT_NAME_AND_INT);
// private static final Pattern ICEBERG_BUCKET_PATTERN = Pattern.compile("bucket\\[(\\d+)]");// private static final Pattern ICEBERG_TRUNCATE_PATTERN = Pattern.compile("truncate\\[(\\d+)]");
private static final String CATALOG_NAME = "icebergCatalog"; private static final String DBNAME = "icebergDB"; private static final String TABLE_NAME = "orders"; private static final String WAREHOUSE = "hdfs://10.0.30.10:9000/user/hive/warehouse"; private static Schema SCHEMA; private static PartitionSpec SPEC; private static Map<String, String> properties = new HashMap<>(); private static Configuration conf; private static HadoopCatalog catalog;
public CreateTableTest(){ conf = new Configuration(); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); catalog = new HadoopCatalog(conf, WAREHOUSE); properties.put("format-version", "2"); }
@Test public void createTable() { prepareCreateTable(); TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME); Transaction txn = catalog.buildTable(tableIdent, SCHEMA) .withPartitionSpec(SPEC) .withProperties(properties) .createTransaction(); txn.commitTransaction(); }
@Test public void showTable(){ TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME); Table table = catalog.loadTable(tableIdent); System.out.println(table.toString()); }
public Table getTable(TableIdentifier identifier){ return catalog.loadTable(identifier); }
@Test public void dropTable(){ TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME); catalog.dropTable(tableIdent, true); }
public Table getTable(String dbName, String tableName){ TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(dbName), tableName); return catalog.loadTable(tableIdent); }
public static void prepareCreateTable(){
SCHEMA = buildSchema();
List<String> partitionFields = new ArrayList<>(); partitionFields.add("order_date"); SPEC = parsePartitionFields(SCHEMA, partitionFields);
conf = new Configuration(); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); }
public static Schema buildSchema() { Types.StructType structType = Types.StructType.of( required(0, "id", Types.LongType.get(), "order_id"), required(1, "order_date", Types.DateType.get()), required(2, "account_number", Types.LongType.get()), required(3, "customer", Types.StringType.get()), optional(4, "country", Types.StringType.get(), "customer country") ); AtomicInteger nextFieldId = new AtomicInteger(1); Type icebergSchema = TypeUtil.assignFreshIds(structType, nextFieldId::getAndIncrement); return new Schema(icebergSchema.asStructType().fields()); }
public static PartitionSpec parsePartitionFields(Schema schema, List<String> fields) { PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); for (String field : fields) { parsePartitionField(builder, field); } return builder.build(); }
public static void parsePartitionField(PartitionSpec.Builder builder, String field) { @SuppressWarnings("PointlessBooleanExpression") boolean matched = false || tryMatch(field, IDENTITY_PATTERN, match -> builder.identity(match.group())) || tryMatch(field, YEAR_PATTERN, match -> builder.year(match.group(1))) || tryMatch(field, MONTH_PATTERN, match -> builder.month(match.group(1))) || tryMatch(field, DAY_PATTERN, match -> builder.day(match.group(1))) || tryMatch(field, HOUR_PATTERN, match -> builder.hour(match.group(1))) || tryMatch(field, BUCKET_PATTERN, match -> builder.bucket(match.group(1), parseInt(match.group(2)))) || tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(2)))); if (!matched) { throw new IllegalArgumentException("Invalid partition field declaration: " + field); } }
private static boolean tryMatch(CharSequence value, Pattern pattern, Consumer<MatchResult> match) { Matcher matcher = pattern.matcher(value); if (matcher.matches()) { match.accept(matcher.toMatchResult()); return true; } return false; }}
评论