一、背景
序列化是指将数据从内存中的对象序列化为字节流,以便在网络中传输或持久化存储。序列化在 Apache Flink 中非常重要,因为它涉及到数据传输和状态管理等关键部分。Apache Flink 以其独特的方式来处理数据类型以及序列化,这种方式包括它自身的类型描述符、泛型类型提取以及类型序列化框架。本文将简单介绍它们背后的概念和基本原理,侧重分享在 DataStream、Flink SQL 自定义函数开发中对数据类型和序列的应用,以提升任务的运行效率。
二、简单理论阐述(基于 Flink 1.13)
主要参考 Apache Flink 1.13
支持的数据类型
具体的数据类型定义在此就不详细介绍了,具体描述可以前往 Flink 官网查看。
TypeInformation
Apache Flink 量身定制了一套序列化框架,好处就是选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好地选取序列化方式,进行数据布局,节省数据的存储空间,甚至直接操作二进制数据。
TypeInformation 类是 Apache Flink 所有类型描述符的基类,通过阅读源码,我们可以大概分成以下几种数据类型。
Basic types:所有的 Java 类型以及包装类:void,String,Date,BigDecimal,and BigInteger 等。
Primitive arrays 以及 Object arrays
Composite types
Flink Java Tuples(Flink Java API 的一部分):最多 25 个字段,不支持空字段
Scala case classes(包括 Scala Tuples):不支持 null 字段
Row:具有任意数量字段并支持空字段的 Tuples
POJO 类:JavaBeans
Auxiliary types (Option,Either,Lists,Maps,…)
Generic types:Flink 内部未维护的类型,这种类型通常是由 Kryo 序列化。
我们简单看下该类的方法,核心是 createSerializer,获取 org.apache.flink.api.common.typeutils.TypeSerializer,执行序列化以及反序列化方法,主要是:
何时需要数据类型获取
在 Apache Flink 中,算子间的数据类型传递是通过流处理的数据流来实现的。数据流可以在算子之间流动,每个算子对数据流进行处理并产生输出。当数据流从一个算子流向另一个算子时,数据的类型也会随之传递。Apache Flink 使用自动类型推断机制来确定数据流中的数据类型。在算子之间传递数据时,Apache Flink 会根据上下文自动推断数据的类型,并在运行时保证数据的类型一致性。
举个例子:新增一个 kafka source,这个时候我们需要指定数据输出类型。
@Experimental
public <OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
TypeInformation<OUT> typeInfo) {
final TypeInformation<OUT> resolvedTypeInfo =
getTypeInfo(source, sourceName, Source.class, typeInfo);
return new DataStreamSource<>(
this,
checkNotNull(source, "source"),
checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
checkNotNull(resolvedTypeInfo),
checkNotNull(sourceName));
}
复制代码
那输入类型怎么不需要指定呢?可以简单看下 OneInputTransformation(单输入算子的基类)类的 getInputType()方法,就是以输入算子的输出类型为输入类型的。
/** Returns the {@code TypeInformation} for the elements of the input. */
public TypeInformation<IN> getInputType() {
return input.getOutputType();
}
复制代码
这样 source 的输出类型会变成下一个算子的输入。整个 DAG 的数据类型都会传递下去。Apache Flink 获取到数据类型后,就可以获取对应的序列化方法。
还有一种情况就是与状态后端交互的时候需要获取数据类型,特别是非 JVM 堆存储的后端,需要频繁的序列化以及反序列化,例如 RocksDBStateBackend。
举个例子,当我们使用 ValueState 时需要调用以下 API:
org.apache.flink.streaming.api.operators.StreamingRuntimeContext#getState
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
return keyedStateStore.getState(stateProperties);
}
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
if (serializerAtomicReference.get() == null) {
checkState(typeInfo != null, "no serializer and no type info");
// try to instantiate and set the serializer
TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
// use cas to assure the singleton
if (!serializerAtomicReference.compareAndSet(null, serializer)) {
LOG.debug("Someone else beat us at initializing the serializer.");
}
}
}
复制代码
可以从 org.apache.flink.api.common.state.StateDescriptor#initializeSerializerUnlessSet 方法看出,需要通过传入的数据类型来获取具体的序列化器。来执行具体的序列化和反序列化逻辑,完成数据的交互。
数据类型的自动推断
乍一看很复杂,各个环节都需要指定数据类型。其实大部分应用场景下,我们不用关注数据的类型以及序列化方式。Flink 会尝试推断有关分布式计算期间交换和存储的数据类型的信息。
这里简单介绍 Flink 类型自动推断的核心类:
org.apache.flink.api.java.typeutils.TypeExtractor
在数据流操作中,Flink 使用了泛型来指定输入和输出的类型。例如,DataStream 表示一个具有类型 T 的数据流。在代码中使用的泛型类型参数 T 会被 TypeExtractor 类解析和推断。在运行时,Apache Flink 会通过调用 TypeExtractor 的静态方法来分析操作的输入和输出,并将推断出的类型信息存储在运行时的环境中。
举个例子:用的最多的 flatMap 算子,当我们不指定返回类型的时候,Flink 会调用 TypeExtractor 类自动去推断用户的类型。
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
return this.flatMap(flatMapper, outType);
}
复制代码
一般看开源框架某个类的功能我都会先看类的注释,也看 TypeExtractor 的注释,大概意思这是一个对类进行反射分析的实用程序,用于确定返回的数据类型。
/**
* A utility for reflection analysis on classes, to determine the return type of implementations of
* transformation functions.
*
* <p>NOTES FOR USERS OF THIS CLASS: Automatic type extraction is a hacky business that depends on a
* lot of variables such as generics, compiler, interfaces, etc. The type extraction fails regularly
* with either {@link MissingTypeInfo} or hard exceptions. Whenever you use methods of this class,
* make sure to provide a way to pass custom type information as a fallback.
*/
复制代码
我们来看下其中一个核心的静态推断逻辑,org.apache.flink.api.java.typeutils.TypeExtractor#getUnaryOperatorReturnType
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
Function function,
Class<?> baseClass,
int inputTypeArgumentIndex,
int outputTypeArgumentIndex,
int[] lambdaOutputTypeArgumentIndices,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing) {
Preconditions.checkArgument(
inType == null || inputTypeArgumentIndex >= 0,
"Input type argument index was not provided");
Preconditions.checkArgument(
outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
Preconditions.checkArgument(
lambdaOutputTypeArgumentIndices != null,
"Indices for output type arguments within lambda not provided");
// explicit result type has highest precedence
if (function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
// perform extraction
try {
final LambdaExecutable exec;
try {
exec = checkAndExtractLambda(function);
} catch (TypeExtractionException e) {
throw new InvalidTypesException("Internal error occurred.", e);
}
if (exec != null) {
// parameters must be accessed from behind, since JVM can add additional parameters
// e.g. when using local variables inside lambda function
// paramLen is the total number of parameters of the provided lambda, it includes
// parameters added through closure
final int paramLen = exec.getParameterTypes().length;
final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
// number of parameters the SAM of implemented interface has; the parameter indexing
// applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
final Type output;
if (lambdaOutputTypeArgumentIndices.length > 0) {
output =
TypeExtractionUtils.extractTypeFromLambda(
baseClass,
exec,
lambdaOutputTypeArgumentIndices,
paramLen,
baseParametersLen);
} else {
output = exec.getReturnType();
TypeExtractionUtils.validateLambdaType(baseClass, output);
}
return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
} else {
if (inType != null) {
validateInputType(
baseClass, function.getClass(), inputTypeArgumentIndex, inType);
}
return new TypeExtractor()
.privateCreateTypeInfo(
baseClass,
function.getClass(),
outputTypeArgumentIndex,
inType,
null);
}
} catch (InvalidTypesException e) {
if (allowMissing) {
return (TypeInformation<OUT>)
new MissingTypeInfo(
functionName != null ? functionName : function.toString(), e);
} else {
throw e;
}
}
}
复制代码
public class KafkaSource<OUT>
implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
ResultTypeQueryable<OUT>
//deserializationSchema 是需要用户自己定义的。
@Override
public TypeInformation<OUT> getProducedType() {
return deserializationSchema.getProducedType();
}
//JSONKeyValueDeserializationSchema
@Override
public TypeInformation<ObjectNode> getProducedType() {
return getForClass(ObjectNode.class);
}
复制代码
通过以上的代码逻辑的阅读,我们大概能总结出以下结论:Flink 内部维护了很多高效的序列化方式,通常只有数据类型被推断为 org.apache.flink.api.java.typeutils.GenericTypeInfo 时我们才需要自定义序列化类型,否则性能就是灾难;或者无法推断类型的时候,例如 Flink SQL 复杂类型有时候是无法自动推断类型的,当然某些特殊的对象 Kryo 也无法序列化,比如之前遇到过 TreeMap 无法 Kryo 序列化 (也可能是自己姿势不对),建议在开发 Apache Flink 作业时可以养成显式指定数据类型的好习惯。
三、开发实践
Flink 代码作业
如何显式指定数据类型
这个简单了,几乎所有的 source、Keyby、算子等都暴露了指定 TypeInformation<OUT> typeInfo 的构造方法,以下简单列举几个:
@Experimental
public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = this.getTypeInfo(source, sourceName, Source.class, typeInfo);
return new DataStreamSource(this, (Source)Preconditions.checkNotNull(source, "source"), (WatermarkStrategy)Preconditions.checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"), (TypeInformation)Preconditions.checkNotNull(resolvedTypeInfo), (String)Preconditions.checkNotNull(sourceName));
}
复制代码
public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
复制代码
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
复制代码
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(keyType);
return new KeyedStream(this, (KeySelector)this.clean(key), keyType);
}
复制代码
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
super(name, typeInfo, (Object)null);
}
复制代码
自定义数据类型 &自定义序列化器
当遇到复杂数据类型,或者需要优化任务性能时,需要自定义数据类型,以下分享几种场景以及实现代码:
例如大家最常用的 POJO 类,何为 POJO 类大家可以自行查询,Flink 对 POJO 类做了大量的优化,大家使用 Java 对象最好满足 POJO 的规范。
举个例子,这是一个典型的 POJO 类:
@Data
public class BroadcastConfig implements Serializable {
public String config_type;
public String date;
public String media_id;
public String account_id;
public String label_id;
public long start_time;
public long end_time;
public int interval;
public String msg;
public BroadcastConfig() {
}
}
复制代码
我们可以这样指定其数据类型,返回的数据类就是一个 TypeInformation<BroadcastCofig>
HashMap<String, TypeInformation<?>> pojoFieldName = new HashMap<>();
pojoFieldName.put("config_type", Types.STRING);
pojoFieldName.put("date", Types.STRING);
pojoFieldName.put("media_id", Types.STRING);
pojoFieldName.put("account_id", Types.STRING);
pojoFieldName.put("label_id", Types.STRING);
pojoFieldName.put("start_time", Types.LONG);
pojoFieldName.put("end_time", Types.LONG);
pojoFieldName.put("interval", Types.INT);
pojoFieldName.put("msg", Types.STRING);
return Types.POJO(
BroadcastConfig.class,
pojoFieldName
);
复制代码
如感兴趣,可以看下 org.apache.flink.api.java.typeutils.runtime.PojoSerializer,看 Flink 本身对其做了哪些优化。
某些特殊场景可能还需要复杂的对象,例如,需要极致的性能优化,在 Flink Table Api 中数据对象传输,大部分都是 BinaryRowdata,效率非常高。我们在 Flink Datastram 代码作业中也想使用,怎么操作呢?这里分享一种实现方式——自定义 TypeInformation,当然还有更优的实现方式,这里就不介绍了。
代码实现:本质上就是继承 TypeInformation,实现对应的方法。核心逻辑是 createSerializer()方法,这里我们直接使用 Table Api 中已经实现的 BinaryRowDataSerializer,就可以达到同 Flink SQL 相同的性能优化。
public class BinaryRowDataTypeInfo extends TypeInformation<BinaryRowData> {
private static final long serialVersionUID = 4786289562505208256L;
private final int numFields;
private final Class<BinaryRowData> clazz;
private final TypeSerializer<BinaryRowData> serializer;
public BinaryRowDataTypeInfo(int numFields) {
this.numFields=numFields;
this.clazz=BinaryRowData.class;
serializer= new BinaryRowDataSerializer(numFields);
}
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return false;
}
@Override
public int getArity() {
return numFields;
}
@Override
public int getTotalFields() {
return numFields;
}
@Override
public Class<BinaryRowData> getTypeClass() {
return this.clazz;
}
@Override
public boolean isKeyType() {
return false;
}
@Override
public TypeSerializer<BinaryRowData> createSerializer(ExecutionConfig config) {
return serializer;
}
@Override
public String toString() {
return "BinaryRowDataTypeInfo<" + clazz.getCanonicalName() + ">";
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BinaryRowDataTypeInfo) {
BinaryRowDataTypeInfo that = (BinaryRowDataTypeInfo) obj;
return that.canEqual(this)
&& this.numFields==that.numFields;
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hash(this.clazz,serializer.hashCode());
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof BinaryRowDataTypeInfo;
}
}
复制代码
所以这里建议 Apache Flink 代码作业开发可以尽可能使用已经优化好的数据类型,例如 BinaryRowdata,可以用于高性能的数据处理场景,例如在内存中进行批处理或流式处理。由于数据以二进制形式存储,可以更有效地使用内存和进行数据序列化。同时,BinaryRowData 还提供了一组方法,用于访问和操作二进制数据。
上面的例子只是自定义了 TypeInformation,当然还会遇到自定义 TypeSerializer 的场景,例如 Apache Flink 本身没有封装的数据类型。
代码实现:这里以位图存储 Roaring64Bitmap 为例,在某些特殊场景可以使用 bitmap 精准去重,减少存储空间。
我们需要继承 TypeSerializer,实现其核心逻辑也是 serialize() 、deserialize() 方法,可以使用 Roaring64Bitmap 自带的序列化、反序列化方法。如果你使用的复杂对象没有提供序列化方法,你也可以自己实现或者找开源的序列化器。有了自定义的 TypeSerializer 就可以在你自定义的 TypeInformation 中调用。
public class Roaring64BitmapTypeSerializer extends TypeSerializer<Roaring64Bitmap> {
/**
* Sharable instance of the Roaring64BitmapTypeSerializer.
*/
public static final Roaring64BitmapTypeSerializer INSTANCE = new Roaring64BitmapTypeSerializer();
private static final long serialVersionUID = -8544079063839253971L;
@Override
public boolean isImmutableType() {
return false;
}
@Override
public TypeSerializer<Roaring64Bitmap> duplicate() {
return this;
}
@Override
public Roaring64Bitmap createInstance() {
return new Roaring64Bitmap();
}
@Override
public Roaring64Bitmap copy(Roaring64Bitmap from) {
Roaring64Bitmap copiedMap = new Roaring64Bitmap();
from.forEach(copiedMap::addLong);
return copiedMap;
}
@Override
public Roaring64Bitmap copy(Roaring64Bitmap from, Roaring64Bitmap reuse) {
from.forEach(reuse::addLong);
return reuse;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(Roaring64Bitmap record, DataOutputView target) throws IOException {
record.serialize(target);
}
@Override
public Roaring64Bitmap deserialize(DataInputView source) throws IOException {
Roaring64Bitmap navigableMap = new Roaring64Bitmap();
navigableMap.deserialize(source);
return navigableMap;
}
@Override
public Roaring64Bitmap deserialize(Roaring64Bitmap reuse, DataInputView source) throws IOException {
reuse.deserialize(source);
return reuse;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
Roaring64Bitmap deserialize = this.deserialize(source);
copy(deserialize);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == Roaring64BitmapTypeSerializer.class) {
return true;
} else {
return false;
}
}
@Override
public int hashCode() {
return this.getClass().hashCode();
}
@Override
public TypeSerializerSnapshot<Roaring64Bitmap> snapshotConfiguration() {
return new Roaring64BitmapTypeSerializer.Roaring64BitmapSerializerSnapshot();
}
public static final class Roaring64BitmapSerializerSnapshot
extends SimpleTypeSerializerSnapshot<Roaring64Bitmap> {
public Roaring64BitmapSerializerSnapshot() {
super(() -> Roaring64BitmapTypeSerializer.INSTANCE);
}
}
}
复制代码
Flink SQL 自定义函数
如何显式指定数据类型
这里简单分享下,在自定义 Function 开发下遇到复杂数据类型无法在 accumulator 或者 input、output 中使用的问题,这里我们只介绍使用复杂数据对象如何指定数据类型的场景。
我们可以先看下 FunctionDefinitionConvertRule,这是 Apache Flink 中的一个规则(Rule),用于将用户自定义的函数定义转换为对应的实现。其中通过 getTypeInference()方法返回用于执行对此函数定义的调用的类型推理的逻辑。
@Override
public Optional<RexNode> convert(CallExpression call, ConvertContext context) {
FunctionDefinition functionDefinition = call.getFunctionDefinition();
// built-in functions without implementation are handled separately
if (functionDefinition instanceof BuiltInFunctionDefinition) {
final BuiltInFunctionDefinition builtInFunction =
(BuiltInFunctionDefinition) functionDefinition;
if (!builtInFunction.getRuntimeClass().isPresent()) {
return Optional.empty();
}
}
TypeInference typeInference =
functionDefinition.getTypeInference(context.getDataTypeFactory());
if (typeInference.getOutputTypeStrategy() == TypeStrategies.MISSING) {
return Optional.empty();
}
switch (functionDefinition.getKind()) {
case SCALAR:
case TABLE:
List<RexNode> args =
call.getChildren().stream()
.map(context::toRexNode)
.collect(Collectors.toList());
final BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(
context.getDataTypeFactory(),
context.getTypeFactory(),
SqlKind.OTHER_FUNCTION,
call.getFunctionIdentifier().orElse(null),
functionDefinition,
typeInference);
return Optional.of(context.getRelBuilder().call(sqlFunction, args));
default:
return Optional.empty();
}
}
复制代码
那我们指定复杂类型也会从通过该方法实现,不多说了,直接上代码实现。
这是之前写的 AbstractLastValueWithRetractAggFunction 功能主要是为了实现具有 local-global 的逻辑的 LastValue,提升作业性能。
accumulator 对象:LastValueWithRetractAccumulator,可以看到该对象是一个非常复杂的对象,包含 5 个属性,还有 List<Tuple2> 复杂嵌套,以及 MapView 等可以操作状态后端的对象,甚至有 Object 这种通用的对象。
public static class LastValueWithRetractAccumulator {
public Object lastValue = null;
public Long lastOrder = null;
public List<Tuple2<Object, Long>> retractList = new ArrayList<>();
public MapView<Object, List<Long>> valueToOrderMap = new MapView<>();
public MapView<Long, List<Object>> orderToValueMap = new MapView<>();
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof LastValueWithRetractAccumulator)) {
return false;
}
LastValueWithRetractAccumulator that = (LastValueWithRetractAccumulator) o;
return Objects.equals(lastValue, that.lastValue)
&& Objects.equals(lastOrder, that.lastOrder)
&& Objects.equals(retractList, that.retractList)
&& valueToOrderMap.equals(that.valueToOrderMap)
&& orderToValueMap.equals(that.orderToValueMap)
;
}
@Override
public int hashCode() {
return Objects.hash(lastValue, lastOrder, valueToOrderMap, orderToValueMap, retractList);
}
}
复制代码
getTypeInference() 是 FunctionDefinition 接口的方法,而所有的用户自定义函数都实现了该接口,我们只需要重新实现下该方法就可以,以下是代码实现。
这里我们还需要用到工具类 TypeInference,这是 Flink 中的一个模块,用于进行类型推断和类型推理。
可以看出我们在 accumulatorTypeStrategy 方法中传入了一个构建好的 TypeStrategy;这里我们将 LastValueWithRetractAccumulator 定义为了一个 STRUCTURED,不同的属性定义为具体的数据类型,DataTypes 工具类提供了很多丰富的对象形式,还有万能的 RAW 类型。
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.accumulatorTypeStrategy(callContext -> {
List<DataType> dataTypes = callContext.getArgumentDataTypes();
DataType argDataType;
if (dataTypes.get(0)
.getLogicalType()
.getTypeRoot()
.getFamilies()
.contains(LogicalTypeFamily.CHARACTER_STRING)) {
argDataType = DataTypes.STRING();
} else
argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));
DataType accDataType = DataTypes.STRUCTURED(
LastValueWithRetractAccumulator.class,
DataTypes.FIELD("lastValue", argDataType.nullable()),
DataTypes.FIELD("lastOrder", DataTypes.BIGINT()),
DataTypes.FIELD("retractList", DataTypes.ARRAY(
DataTypes.STRUCTURED(
Tuple2.class,
DataTypes.FIELD("f0", argDataType.nullable()),
DataTypes.FIELD("f1", DataTypes.BIGINT())
)).bridgedTo(List.class)),
DataTypes.FIELD(
"valueToOrderMap",
MapView.newMapViewDataType(
argDataType.nullable(),
DataTypes.ARRAY(DataTypes.BIGINT()).bridgedTo(List.class))),
//todo:blink 使用SortedMapView 优化性能,开源使用MapView key天然字典升序,倒序遍历性能可能不佳
DataTypes.FIELD(
"orderToValueMap",
MapView.newMapViewDataType(
DataTypes.BIGINT(),
DataTypes.ARRAY(argDataType.nullable()).bridgedTo(List.class)))
);
return Optional.of(accDataType);
})
.build()
;
}
复制代码
这个也很简单,直接上代码实现,主要就是 outputTypeStrategy 中传入需要输出的数据类型即可。
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(callContext -> {
List<DataType> dataTypes = callContext.getArgumentDataTypes();
DataType argDataType;
if (dataTypes.get(0)
.getLogicalType()
.getTypeRoot()
.getFamilies()
.contains(LogicalTypeFamily.CHARACTER_STRING)) {
argDataType = DataTypes.STRING();
} else
argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));
return Optional.of(argDataType);
})
.build()
;
}
复制代码
在此就不做介绍了,同以上类似,在 inputTypeStrategy 方法传入定义好的 TypeStrategy 就好。
在某些场景下,我们需要让函数功能性更强,比如当我输入是 bigint 类型的时候,我输出 bigint 类型等,类似的逻辑。
大家可以发现 outputTypeStrategy 或者 accumulatorTypeStrategy 的入参都是 实现了 TypeStrategy 接口的对象,并且需要实现 inferType 方法。在 Flink 框架调用该方法的时候会传入一个上下文对象 CallContext,提供了获取函数入参类型的 api getArgumentDataTypes();
代码实现:这里的逻辑是将获取到的第一个入参对象的类型指定为输出对象的类型。
.outputTypeStrategy(callContext -> {
List<DataType> dataTypes = callContext.getArgumentDataTypes();
DataType argDataType;
if (dataTypes.get(0)
.getLogicalType()
.getTypeRoot()
.getFamilies()
.contains(LogicalTypeFamily.CHARACTER_STRING)) {
argDataType = DataTypes.STRING();
} else
argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));
return Optional.of(argDataType);
}
复制代码
自定义 DataType
可以发现以上分享几乎都是使用的 DataTypes 封装好的类型,比如 DataTypes.STRING()、DataTypes.Long()等。那如果我们需要封装一些其他对象如何操作呢?上文提到 DataTypes 提供了一个自定义任意类型的方法。
/**
* Data type of an arbitrary serialized type. This type is a black box within the table
* ecosystem and is only deserialized at the edges.
*
* <p>The raw type is an extension to the SQL standard.
*
* <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link
* #RAW(Class)} for automatically generating a serializer.
*
* @param clazz originating value class
* @param serializer type serializer
* @see RawType
*/
public static <T> DataType RAW(Class<T> clazz, TypeSerializer<T> serializer) {
return new AtomicDataType(new RawType<>(clazz, serializer));
}
复制代码
我们有这样的一个场景,需要在自定义的函数中使用 bitmap 计算 UV 值,需要定义 Roaring64Bitmap 为 accumulatorType,直接上代码实现。
这里的 Roaring64BitmapTypeSerializer 已经在《自定义 TypeSerializer》小段中实现,有兴趣的同学可以往上翻翻。
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.accumulatorTypeStrategy(callContext -> {
DataType type = DataTypes.RAW(
Roaring64Bitmap.class,
Roaring64BitmapTypeSerializer.INSTANCE
);
return Optional.of(type);
})
.outputTypeStrategy(callContext -> Optional.of(DataTypes.BIGINT()))
.build()
;
}
复制代码
四、结语
本文主要简单分享了一些自身对 Flink 类型及序列化的认识和应用实践,能力有限,不足之处欢迎指正。
引用:https://nightlies.apache.org/flink/flink-docs-release-1.13/
*文/ 木木
本文属得物技术原创,更多精彩文章请看:得物技术
未经得物技术许可严禁转载,否则依法追究法律责任!
评论