写点什么

Flink SQL 自定义 Source format

用户头像
shengjk1
关注
发布于: 2021 年 03 月 30 日

1.背景


由于 kafka 中的 json 属于嵌套,又不想二次序列化再把它展开,故自定义 format。


2.步骤


1.自定义 Factory 实现 DeserializationFormatFactory

2.自定义 DeserializationSchema 实现 DeserializationSchema

  1. 自定义 Factory 中 createDecodingFormat 方法返回 createDecodingFormat


3.自定义 Format


为了简单起见,我们自定义一个 NullFormat ,也就是无论 kafka 中的消息是什么都返回 null,相当于 kafka 中没有消息


自定义 Factory

import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.formats.json.JsonOptions;import org.apache.flink.formats.json.TimestampFormat;import org.apache.flink.table.api.ValidationException;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.format.DecodingFormat;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DeserializationFormatFactory;import org.apache.flink.table.factories.DynamicTableFactory;import org.apache.flink.table.factories.FactoryUtil;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;import java.util.HashSet;import java.util.Set;
import static org.apache.flink.formats.json.JsonOptions.*;
/** * Table format factory for providing configured instances of JSON to RowData * {@link SerializationSchema} and {@link DeserializationSchema}. */public class NullFormatFactory implements DeserializationFormatFactory { // Factory 的唯一标识 public static final String IDENTIFIER = "null"; @SuppressWarnings("unchecked") @Override // 解码的入口方法 基本上属于固定写法 public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); validateFormatOptions(formatOptions); final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD); final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions); return new DecodingFormat<DeserializationSchema<RowData>>() { @Override public DeserializationSchema<RowData> createRuntimeDecoder( DynamicTableSource.Context context,//ScanRuntimeProviderContext DataType producedDataType) { // 表的字段名和数据类型 final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation<RowData> rowDataTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(producedDataType); return new NullRowDataDeserializationSchema( rowType, rowDataTypeInfo, failOnMissingField, ignoreParseErrors, timestampOption ); } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } }; } @Override public String factoryIdentifier() { return IDENTIFIER; } @Override public Set<ConfigOption<?>> requiredOptions() { return Collections.emptySet(); } @Override public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(FAIL_ON_MISSING_FIELD); options.add(IGNORE_PARSE_ERRORS); options.add(TIMESTAMP_FORMAT); return options; } // ------------------------------------------------------------------------ // Validation // ------------------------------------------------------------------------ static void validateFormatOptions(ReadableConfig tableOptions) { boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD); boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS); String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT); if (ignoreParseErrors && failOnMissingField) { throw new ValidationException(FAIL_ON_MISSING_FIELD.key() + " and " + IGNORE_PARSE_ERRORS.key() + " shouldn't both be true."); } if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)) { throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].", timestampFormat, TIMESTAMP_FORMAT.key())); } }}
复制代码

自定义 DeserializationSchema


import org.apache.flink.annotation.Internal;import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.formats.json.TimestampFormat;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.table.data.RowData;import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@Internalpublic class NullRowDataDeserializationSchema implements DeserializationSchema<RowData> { private static final long serialVersionUID = 1L; /** * Flag indicating whether to fail if a field is missing. */ private final boolean failOnMissingField; /** * Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ private final boolean ignoreParseErrors; /** * TypeInformation of the produced {@link RowData}. **/ private final TypeInformation<RowData> resultTypeInfo; /** * Runtime converter that converts {@link JsonNode}s into * objects of Flink SQL internal data structures. **/ /** * Object mapper for parsing the JSON. */ private final ObjectMapper objectMapper = new ObjectMapper(); /** * Timestamp format specification which is used to parse timestamp. */ private final TimestampFormat timestampFormat; public NullRowDataDeserializationSchema( RowType rowType, TypeInformation<RowData> resultTypeInfo, boolean failOnMissingField, boolean ignoreParseErrors, TimestampFormat timestampFormat) { if (ignoreParseErrors && failOnMissingField) { throw new IllegalArgumentException( "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."); } this.resultTypeInfo = checkNotNull(resultTypeInfo); this.failOnMissingField = failOnMissingField; this.ignoreParseErrors = ignoreParseErrors; this.timestampFormat = timestampFormat; } @Override // 这里其实是真正的反序列化逻辑,比如说将 json 拍平 (多层嵌套转化为一层嵌套 ) // 这里是重点,记得关注重点 public RowData deserialize(byte[] message) throws IOException { return null; } @Override public boolean isEndOfStream(RowData nextElement) { return false; } @Override public TypeInformation<RowData> getProducedType() { return resultTypeInfo; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } NullRowDataDeserializationSchema that = (NullRowDataDeserializationSchema) o; return failOnMissingField == that.failOnMissingField && ignoreParseErrors == that.ignoreParseErrors && resultTypeInfo.equals(that.resultTypeInfo) && timestampFormat.equals(that.timestampFormat); } @Override public int hashCode() { return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat); }}
复制代码


<h2 id="4">4.使用自定义 Format </h2>


public class SqlKafka {	public static void main(String[] args) throws Exception {		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();				EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();		StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);		// enable checkpointing		Configuration configuration = tableEnv.getConfig().getConfiguration();		configuration.set(				ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);		configuration.set(				ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));				String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +				") WITH (" +				"'connector' = 'kafka','topic' = 'canal_monitor_order'," +				"'properties.bootstrap.servers' = 'bigdata-dev-mq:9092','properties.group.id' = 'testGroup'," +				"'format' = 'null','scan.startup.mode' = 'earliest-offset')";		tableEnv.executeSql(sql);		......
复制代码

'format' = 'null' Factory 的唯一标识

然后就可以直接执行了


发布于: 2021 年 03 月 30 日阅读数: 7
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
Flink SQL 自定义 Source format