import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import static java.lang.Integer.parseInt;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
public class CreateTableTest {
private static final String NAME = "[a-z_][a-z0-9_]*";
private static final String FUNCTION_ARGUMENT_NAME = "\\((" + NAME + ")\\)";
private static final String FUNCTION_ARGUMENT_NAME_AND_INT = "\\((" + NAME + "), *(\\d+)\\)";
private static final Pattern IDENTITY_PATTERN = Pattern.compile(NAME);
private static final Pattern YEAR_PATTERN = Pattern.compile("year" + FUNCTION_ARGUMENT_NAME);
private static final Pattern MONTH_PATTERN = Pattern.compile("month" + FUNCTION_ARGUMENT_NAME);
private static final Pattern DAY_PATTERN = Pattern.compile("day" + FUNCTION_ARGUMENT_NAME);
private static final Pattern HOUR_PATTERN = Pattern.compile("hour" + FUNCTION_ARGUMENT_NAME);
private static final Pattern BUCKET_PATTERN = Pattern.compile("bucket" + FUNCTION_ARGUMENT_NAME_AND_INT);
private static final Pattern TRUNCATE_PATTERN = Pattern.compile("truncate" + FUNCTION_ARGUMENT_NAME_AND_INT);
// private static final Pattern ICEBERG_BUCKET_PATTERN = Pattern.compile("bucket\\[(\\d+)]");
// private static final Pattern ICEBERG_TRUNCATE_PATTERN = Pattern.compile("truncate\\[(\\d+)]");
private static final String CATALOG_NAME = "icebergCatalog";
private static final String DBNAME = "icebergDB";
private static final String TABLE_NAME = "orders";
private static final String WAREHOUSE = "hdfs://10.0.30.10:9000/user/hive/warehouse";
private static Schema SCHEMA;
private static PartitionSpec SPEC;
private static Map<String, String> properties = new HashMap<>();
private static Configuration conf;
private static HadoopCatalog catalog;
public CreateTableTest(){
conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
catalog = new HadoopCatalog(conf, WAREHOUSE);
properties.put("format-version", "2");
}
@Test
public void createTable() {
prepareCreateTable();
TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME);
Transaction txn = catalog.buildTable(tableIdent, SCHEMA)
.withPartitionSpec(SPEC)
.withProperties(properties)
.createTransaction();
txn.commitTransaction();
}
@Test
public void showTable(){
TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME);
Table table = catalog.loadTable(tableIdent);
System.out.println(table.toString());
}
public Table getTable(TableIdentifier identifier){
return catalog.loadTable(identifier);
}
@Test
public void dropTable(){
TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(CATALOG_NAME, DBNAME), TABLE_NAME);
catalog.dropTable(tableIdent, true);
}
public Table getTable(String dbName, String tableName){
TableIdentifier tableIdent = TableIdentifier.of(Namespace.of(dbName), tableName);
return catalog.loadTable(tableIdent);
}
public static void prepareCreateTable(){
SCHEMA = buildSchema();
List<String> partitionFields = new ArrayList<>();
partitionFields.add("order_date");
SPEC = parsePartitionFields(SCHEMA, partitionFields);
conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
}
public static Schema buildSchema() {
Types.StructType structType = Types.StructType.of(
required(0, "id", Types.LongType.get(), "order_id"),
required(1, "order_date", Types.DateType.get()),
required(2, "account_number", Types.LongType.get()),
required(3, "customer", Types.StringType.get()),
optional(4, "country", Types.StringType.get(), "customer country")
);
AtomicInteger nextFieldId = new AtomicInteger(1);
Type icebergSchema = TypeUtil.assignFreshIds(structType, nextFieldId::getAndIncrement);
return new Schema(icebergSchema.asStructType().fields());
}
public static PartitionSpec parsePartitionFields(Schema schema, List<String> fields) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
for (String field : fields) {
parsePartitionField(builder, field);
}
return builder.build();
}
public static void parsePartitionField(PartitionSpec.Builder builder, String field)
{
@SuppressWarnings("PointlessBooleanExpression")
boolean matched = false ||
tryMatch(field, IDENTITY_PATTERN, match -> builder.identity(match.group())) ||
tryMatch(field, YEAR_PATTERN, match -> builder.year(match.group(1))) ||
tryMatch(field, MONTH_PATTERN, match -> builder.month(match.group(1))) ||
tryMatch(field, DAY_PATTERN, match -> builder.day(match.group(1))) ||
tryMatch(field, HOUR_PATTERN, match -> builder.hour(match.group(1))) ||
tryMatch(field, BUCKET_PATTERN, match -> builder.bucket(match.group(1), parseInt(match.group(2)))) ||
tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(2))));
if (!matched) {
throw new IllegalArgumentException("Invalid partition field declaration: " + field);
}
}
private static boolean tryMatch(CharSequence value, Pattern pattern, Consumer<MatchResult> match)
{
Matcher matcher = pattern.matcher(value);
if (matcher.matches()) {
match.accept(matcher.toMatchResult());
return true;
}
return false;
}
}
评论