网站功能需求用什么做,设计师值得拥有的设计导航,装修风格有哪些,手机网站底部导航一、背景
序列化是指将数据从内存中的对象序列化为字节流#xff0c;以便在网络中传输或持久化存储。序列化在Apache Flink中非常重要#xff0c;因为它涉及到数据传输和状态管理等关键部分。Apache Flink以其独特的方式来处理数据类型以及序列化#xff0c;这种方式包括它…一、背景
序列化是指将数据从内存中的对象序列化为字节流以便在网络中传输或持久化存储。序列化在Apache Flink中非常重要因为它涉及到数据传输和状态管理等关键部分。Apache Flink以其独特的方式来处理数据类型以及序列化这种方式包括它自身的类型描述符、泛型类型提取以及类型序列化框架。本文将简单介绍它们背后的概念和基本原理侧重分享在DataStream、Flink SQL自定义函数开发中对数据类型和序列的应用以提升任务的运行效率。
二、简单理论阐述(基于Flink 1.13)
主要参考Apache Flink 1.13
支持的数据类型
Java Tuples and Scala Case ClassesJava POJOsPrimitive TypesRegular ClassesValuesHadoop WritablesSpecial Types
具体的数据类型定义在此就不详细介绍了具体描述可以前往Flink官网查看。
TypeInformation
Apache Flink量身定制了一套序列化框架好处就是选择自己定制的序列化框架对类型信息了解越多可以在早期完成类型检查更好地选取序列化方式进行数据布局节省数据的存储空间甚至直接操作二进制数据。
TypeInformation类是Apache Flink所有类型描述符的基类通过阅读源码我们可以大概分成以下几种数据类型。
Basic types所有的Java类型以及包装类voidStringDateBigDecimaland BigInteger等。Primitive arrays以及Object arraysComposite typesFlink Java TuplesFlink Java API的一部分最多25个字段不支持空字段Scala case classes包括Scala Tuples不支持null字段Row具有任意数量字段并支持空字段的TuplesPOJO 类JavaBeansAuxiliary types (OptionEitherListsMaps…)Generic typesFlink内部未维护的类型这种类型通常是由Kryo序列化。
我们简单看下该类的方法核心是createSerializer获取org.apache.flink.api.common.typeutils.TypeSerializer执行序列化以及反序列化方法主要是
org.apache.flink.api.common.typeutils.TypeSerializer#serializeorg.apache.flink.api.common.typeutils.TypeSerializer#deserialize(org.apache.flink.core.memory.DataInputView) 何时需要数据类型获取
在Apache Flink中算子间的数据类型传递是通过流处理的数据流来实现的。数据流可以在算子之间流动每个算子对数据流进行处理并产生输出。当数据流从一个算子流向另一个算子时数据的类型也会随之传递。Apache Flink使用自动类型推断机制来确定数据流中的数据类型。在算子之间传递数据时Apache Flink会根据上下文自动推断数据的类型并在运行时保证数据的类型一致性。
举个例子新增一个kafka source这个时候我们需要指定数据输出类型。
Experimental
public OUT DataStreamSourceOUT fromSource(SourceOUT, ?, ? source,WatermarkStrategyOUT timestampsAndWatermarks,String sourceName,TypeInformationOUT typeInfo) {final TypeInformationOUT resolvedTypeInfo getTypeInfo(source, sourceName, Source.class, typeInfo);return new DataStreamSource(this,checkNotNull(source, source),checkNotNull(timestampsAndWatermarks, timestampsAndWatermarks),checkNotNull(resolvedTypeInfo),checkNotNull(sourceName));
}那输入类型怎么不需要指定呢可以简单看下OneInputTransformation单输入算子的基类类的getInputType()方法就是以输入算子的输出类型为输入类型的。
/** Returns the {code TypeInformation} for the elements of the input. */
public TypeInformationIN getInputType() {return input.getOutputType();
}这样source的输出类型会变成下一个算子的输入。整个DAG的数据类型都会传递下去。Apache Flink获取到数据类型后就可以获取对应的序列化方法。
还有一种情况就是与状态后端交互的时候需要获取数据类型特别是非JVM堆存储的后端需要频繁的序列化以及反序列化例如RocksDBStateBackend。
举个例子当我们使用ValueState时需要调用以下API
org.apache.flink.streaming.api.operators.StreamingRuntimeContext#getState
Override
public T ValueStateT getState(ValueStateDescriptorT stateProperties) {KeyedStateStore keyedStateStore checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(getExecutionConfig());return keyedStateStore.getState(stateProperties);
}public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {if (serializerAtomicReference.get() null) {checkState(typeInfo ! null, no serializer and no type info);// try to instantiate and set the serializerTypeSerializerT serializer typeInfo.createSerializer(executionConfig);// use cas to assure the singletonif (!serializerAtomicReference.compareAndSet(null, serializer)) {LOG.debug(Someone else beat us at initializing the serializer.);}}
}可以从org.apache.flink.api.common.state.StateDescriptor#initializeSerializerUnlessSet方法看出需要通过传入的数据类型来获取具体的序列化器。来执行具体的序列化和反序列化逻辑完成数据的交互。
数据类型的自动推断
乍一看很复杂各个环节都需要指定数据类型。其实大部分应用场景下我们不用关注数据的类型以及序列化方式。Flink会尝试推断有关分布式计算期间交换和存储的数据类型的信息。
这里简单介绍Flink类型自动推断的核心类
org.apache.flink.api.java.typeutils.TypeExtractor
在数据流操作中Flink使用了泛型来指定输入和输出的类型。例如DataStream表示一个具有类型T的数据流。在代码中使用的泛型类型参数T会被TypeExtractor类解析和推断。在运行时Apache Flink会通过调用TypeExtractor的静态方法来分析操作的输入和输出并将推断出的类型信息存储在运行时的环境中。
举个例子用的最多的flatMap算子当我们不指定返回类型的时候Flink会调用TypeExtractor类自动去推断用户的类型。
public R SingleOutputStreamOperatorR flatMap(FlatMapFunctionT, R flatMapper) {TypeInformationR outType TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);return this.flatMap(flatMapper, outType);
}一般看开源框架某个类的功能我都会先看类的注释也看TypeExtractor的注释大概意思这是一个对类进行反射分析的实用程序用于确定返回的数据类型。
/*** A utility for reflection analysis on classes, to determine the return type of implementations of* transformation functions.** pNOTES FOR USERS OF THIS CLASS: Automatic type extraction is a hacky business that depends on a* lot of variables such as generics, compiler, interfaces, etc. The type extraction fails regularly* with either {link MissingTypeInfo} or hard exceptions. Whenever you use methods of this class,* make sure to provide a way to pass custom type information as a fallback.*/我们来看下其中一个核心的静态推断逻辑org.apache.flink.api.java.typeutils.TypeExtractor#getUnaryOperatorReturnType
PublicEvolving
public static IN, OUT TypeInformationOUT getUnaryOperatorReturnType(Function function,Class? baseClass,int inputTypeArgumentIndex,int outputTypeArgumentIndex,int[] lambdaOutputTypeArgumentIndices,TypeInformationIN inType,String functionName,boolean allowMissing) {Preconditions.checkArgument(inType null || inputTypeArgumentIndex 0,Input type argument index was not provided);Preconditions.checkArgument(outputTypeArgumentIndex 0, Output type argument index was not provided);Preconditions.checkArgument(lambdaOutputTypeArgumentIndices ! null,Indices for output type arguments within lambda not provided);// explicit result type has highest precedenceif (function instanceof ResultTypeQueryable) {return ((ResultTypeQueryableOUT) function).getProducedType();}// perform extractiontry {final LambdaExecutable exec;try {exec checkAndExtractLambda(function);} catch (TypeExtractionException e) {throw new InvalidTypesException(Internal error occurred., e);}if (exec ! null) {// parameters must be accessed from behind, since JVM can add additional parameters// e.g. when using local variables inside lambda function// paramLen is the total number of parameters of the provided lambda, it includes// parameters added through closurefinal int paramLen exec.getParameterTypes().length;final Method sam TypeExtractionUtils.getSingleAbstractMethod(baseClass);// number of parameters the SAM of implemented interface has; the parameter indexing// applies to this rangefinal int baseParametersLen sam.getParameterTypes().length;final Type output;if (lambdaOutputTypeArgumentIndices.length 0) {output TypeExtractionUtils.extractTypeFromLambda(baseClass,exec,lambdaOutputTypeArgumentIndices,paramLen,baseParametersLen);} else {output exec.getReturnType();TypeExtractionUtils.validateLambdaType(baseClass, output);}return new TypeExtractor().privateCreateTypeInfo(output, inType, null);} else {if (inType ! null) {validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);}return new TypeExtractor().privateCreateTypeInfo(baseClass,function.getClass(),outputTypeArgumentIndex,inType,null);}} catch (InvalidTypesException e) {if (allowMissing) {return (TypeInformationOUT)new MissingTypeInfo(functionName ! null ? functionName : function.toString(), e);} else {throw e;}}
}首先判断该算子是否实现了ResultTypeQueryable接口本质上就是用户是否显式指定了数据类型例如我们熟悉的Kafka source就实现了该方法当使用了JSONKeyValueDeserializationSchema就显式指定了类型用户自定义Schema就需要自己指定。
public class KafkaSourceOUTimplements SourceOUT, KafkaPartitionSplit, KafkaSourceEnumState,ResultTypeQueryableOUT
//deserializationSchema 是需要用户自己定义的。
Override
public TypeInformationOUT getProducedType() {return deserializationSchema.getProducedType();
}
//JSONKeyValueDeserializationSchema
Override
public TypeInformationObjectNode getProducedType() {return getForClass(ObjectNode.class);
}未实现ResultTypeQueryable接口就会通过反射的方法获取ReturnType判断逻辑大概是从是否是Java 8 lambda方法开始判断的。获取到返回类型后再通过new TypeExtractor()).privateCreateTypeInfo(outputinType(TypeInformation)null)封装成Flink内部能识别的数据类型大致分为2类泛型类型变量TypeVariable以及非泛型类型变量。这个封装的过程也是非常重要的推断的数据类型是Flink内部封装好的类型序列化基本都很高效如果不是 就会推断为GenericTypeInfo走Kryo等序列化方式。如感兴趣可以看下这块的源码在此不再赘述。
通过以上的代码逻辑的阅读我们大概能总结出以下结论Flink内部维护了很多高效的序列化方式通常只有数据类型被推断为org.apache.flink.api.java.typeutils.GenericTypeInfo时我们才需要自定义序列化类型否则性能就是灾难或者无法推断类型的时候例如Flink SQL复杂类型有时候是无法自动推断类型的当然某些特殊的对象Kryo也无法序列化比如之前遇到过TreeMap无法Kryo序列化 (也可能是自己姿势不对)建议在开发Apache Flink作业时可以养成显式指定数据类型的好习惯。
三、开发实践
Flink代码作业
如何显式指定数据类型
这个简单了几乎所有的source、Keyby、算子等都暴露了指定TypeInformation typeInfo的构造方法以下简单列举几个
source
Experimental
public OUT DataStreamSourceOUT fromSource(SourceOUT, ?, ? source, WatermarkStrategyOUT timestampsAndWatermarks, String sourceName, TypeInformationOUT typeInfo) {TypeInformationOUT resolvedTypeInfo this.getTypeInfo(source, sourceName, Source.class, typeInfo);return new DataStreamSource(this, (Source)Preconditions.checkNotNull(source, source), (WatermarkStrategy)Preconditions.checkNotNull(timestampsAndWatermarks, timestampsAndWatermarks), (TypeInformation)Preconditions.checkNotNull(resolvedTypeInfo), (String)Preconditions.checkNotNull(sourceName));
}map
public R SingleOutputStreamOperatorR map(MapFunctionT, R mapper, TypeInformationR outputType) {return transform(Map, outputType, new StreamMap(clean(mapper)));
}自定义Operator
PublicEvolving
public R SingleOutputStreamOperatorR transform(String operatorName, TypeInformationR outTypeInfo, OneInputStreamOperatorT, R operator) {return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}keyBy
public K KeyedStreamT, K keyBy(KeySelectorT, K key, TypeInformationK keyType) {Preconditions.checkNotNull(key);Preconditions.checkNotNull(keyType);return new KeyedStream(this, (KeySelector)this.clean(key), keyType);
}状态后端
public ValueStateDescriptor(String name, TypeInformationT typeInfo) {super(name, typeInfo, (Object)null);
}自定义数据类型自定义序列化器
当遇到复杂数据类型或者需要优化任务性能时需要自定义数据类型以下分享几种场景以及实现代码
POJO类
例如大家最常用的POJO类何为POJO类大家可以自行查询Flink对POJO类做了大量的优化大家使用Java对象最好满足POJO的规范。
举个例子这是一个典型的POJO类
Data
public class BroadcastConfig implements Serializable {public String config_type;public String date;public String media_id;public String account_id;public String label_id;public long start_time;public long end_time;public int interval;public String msg;public BroadcastConfig() {}}我们可以这样指定其数据类型返回的数据类就是一个TypeInformation
HashMapString, TypeInformation? pojoFieldName new HashMap();
pojoFieldName.put(config_type, Types.STRING);
pojoFieldName.put(date, Types.STRING);
pojoFieldName.put(media_id, Types.STRING);
pojoFieldName.put(account_id, Types.STRING);
pojoFieldName.put(label_id, Types.STRING);
pojoFieldName.put(start_time, Types.LONG);
pojoFieldName.put(end_time, Types.LONG);
pojoFieldName.put(interval, Types.INT);
pojoFieldName.put(msg, Types.STRING);return Types.POJO(BroadcastConfig.class,pojoFieldName
);如感兴趣可以看下org.apache.flink.api.java.typeutils.runtime.PojoSerializer看Flink本身对其做了哪些优化。
自定义TypeInformation
某些特殊场景可能还需要复杂的对象例如需要极致的性能优化在Flink Table Api中数据对象传输大部分都是BinaryRowdata效率非常高。我们在Flink Datastram代码作业中也想使用怎么操作呢这里分享一种实现方式——自定义TypeInformation当然还有更优的实现方式这里就不介绍了。
代码实现本质上就是继承TypeInformation实现对应的方法。核心逻辑是createSerializer()方法这里我们直接使用Table Api中已经实现的BinaryRowDataSerializer就可以达到同Flink SQL相同的性能优化。
public class BinaryRowDataTypeInfo extends TypeInformationBinaryRowData {private static final long serialVersionUID 4786289562505208256L;private final int numFields;private final ClassBinaryRowData clazz;private final TypeSerializerBinaryRowData serializer;public BinaryRowDataTypeInfo(int numFields) {this.numFieldsnumFields;this.clazzBinaryRowData.class;serializer new BinaryRowDataSerializer(numFields);}Overridepublic boolean isBasicType() {return false;}Overridepublic boolean isTupleType() {return false;}Overridepublic int getArity() {return numFields;}Overridepublic int getTotalFields() {return numFields;}Overridepublic ClassBinaryRowData getTypeClass() {return this.clazz;}Overridepublic boolean isKeyType() {return false;}Overridepublic TypeSerializerBinaryRowData createSerializer(ExecutionConfig config) {return serializer;}Overridepublic String toString() {return BinaryRowDataTypeInfo clazz.getCanonicalName() ;}Overridepublic boolean equals(Object obj) {if (obj instanceof BinaryRowDataTypeInfo) {BinaryRowDataTypeInfo that (BinaryRowDataTypeInfo) obj;return that.canEqual(this) this.numFieldsthat.numFields;} else {return false;}}Overridepublic int hashCode() {return Objects.hash(this.clazz,serializer.hashCode());}Overridepublic boolean canEqual(Object obj) {return obj instanceof BinaryRowDataTypeInfo;}
}所以这里建议Apache Flink代码作业开发可以尽可能使用已经优化好的数据类型例如BinaryRowdata可以用于高性能的数据处理场景例如在内存中进行批处理或流式处理。由于数据以二进制形式存储可以更有效地使用内存和进行数据序列化。同时BinaryRowData还提供了一组方法用于访问和操作二进制数据。
自定义TypeSerializer
上面的例子只是自定义了TypeInformation当然还会遇到自定义TypeSerializer的场景例如Apache Flink本身没有封装的数据类型。
代码实现这里以位图存储Roaring64Bitmap为例在某些特殊场景可以使用bitmap精准去重减少存储空间。
我们需要继承TypeSerializer实现其核心逻辑也是serialize() 、deserialize() 方法可以使用Roaring64Bitmap自带的序列化、反序列化方法。如果你使用的复杂对象没有提供序列化方法你也可以自己实现或者找开源的序列化器。有了自定义的TypeSerializer就可以在你自定义的TypeInformation中调用。
public class Roaring64BitmapTypeSerializer extends TypeSerializerRoaring64Bitmap {/*** Sharable instance of the Roaring64BitmapTypeSerializer.*/public static final Roaring64BitmapTypeSerializer INSTANCE new Roaring64BitmapTypeSerializer();private static final long serialVersionUID -8544079063839253971L;Overridepublic boolean isImmutableType() {return false;}Overridepublic TypeSerializerRoaring64Bitmap duplicate() {return this;}Overridepublic Roaring64Bitmap createInstance() {return new Roaring64Bitmap();}Overridepublic Roaring64Bitmap copy(Roaring64Bitmap from) {Roaring64Bitmap copiedMap new Roaring64Bitmap();from.forEach(copiedMap::addLong);return copiedMap;}Overridepublic Roaring64Bitmap copy(Roaring64Bitmap from, Roaring64Bitmap reuse) {from.forEach(reuse::addLong);return reuse;}Overridepublic int getLength() {return -1;}Overridepublic void serialize(Roaring64Bitmap record, DataOutputView target) throws IOException {record.serialize(target);}Overridepublic Roaring64Bitmap deserialize(DataInputView source) throws IOException {Roaring64Bitmap navigableMap new Roaring64Bitmap();navigableMap.deserialize(source);return navigableMap;}Overridepublic Roaring64Bitmap deserialize(Roaring64Bitmap reuse, DataInputView source) throws IOException {reuse.deserialize(source);return reuse;}Overridepublic void copy(DataInputView source, DataOutputView target) throws IOException {Roaring64Bitmap deserialize this.deserialize(source);copy(deserialize);}Overridepublic boolean equals(Object obj) {if (obj this) {return true;} else if (obj ! null obj.getClass() Roaring64BitmapTypeSerializer.class) {return true;} else {return false;}}Overridepublic int hashCode() {return this.getClass().hashCode();}Overridepublic TypeSerializerSnapshotRoaring64Bitmap snapshotConfiguration() {return new Roaring64BitmapTypeSerializer.Roaring64BitmapSerializerSnapshot();}public static final class Roaring64BitmapSerializerSnapshotextends SimpleTypeSerializerSnapshotRoaring64Bitmap {public Roaring64BitmapSerializerSnapshot() {super(() - Roaring64BitmapTypeSerializer.INSTANCE);}}
}Flink SQL自定义函数
如何显式指定数据类型
这里简单分享下在自定义Function开发下遇到复杂数据类型无法在accumulator 或者input、output中使用的问题这里我们只介绍使用复杂数据对象如何指定数据类型的场景。
我们可以先看下FunctionDefinitionConvertRule这是Apache Flink中的一个规则Rule用于将用户自定义的函数定义转换为对应的实现。其中通过getTypeInference()方法返回用于执行对此函数定义的调用的类型推理的逻辑。
Override
public OptionalRexNode convert(CallExpression call, ConvertContext context) {FunctionDefinition functionDefinition call.getFunctionDefinition();// built-in functions without implementation are handled separatelyif (functionDefinition instanceof BuiltInFunctionDefinition) {final BuiltInFunctionDefinition builtInFunction (BuiltInFunctionDefinition) functionDefinition;if (!builtInFunction.getRuntimeClass().isPresent()) {return Optional.empty();}}TypeInference typeInference functionDefinition.getTypeInference(context.getDataTypeFactory());if (typeInference.getOutputTypeStrategy() TypeStrategies.MISSING) {return Optional.empty();}switch (functionDefinition.getKind()) {case SCALAR:case TABLE:ListRexNode args call.getChildren().stream().map(context::toRexNode).collect(Collectors.toList());final BridgingSqlFunction sqlFunction BridgingSqlFunction.of(context.getDataTypeFactory(),context.getTypeFactory(),SqlKind.OTHER_FUNCTION,call.getFunctionIdentifier().orElse(null),functionDefinition,typeInference);return Optional.of(context.getRelBuilder().call(sqlFunction, args));default:return Optional.empty();}
}那我们指定复杂类型也会从通过该方法实现不多说了直接上代码实现。
指定accumulatorType
这是之前写的AbstractLastValueWithRetractAggFunction功能主要是为了实现具有local-global的逻辑的LastValue提升作业性能。
accumulator对象LastValueWithRetractAccumulator可以看到该对象是一个非常复杂的对象包含5个属性还有List 复杂嵌套以及MapView等可以操作状态后端的对象甚至有Object这种通用的对象。
public static class LastValueWithRetractAccumulator {public Object lastValue null;public Long lastOrder null;public ListTuple2Object, Long retractList new ArrayList();public MapViewObject, ListLong valueToOrderMap new MapView();public MapViewLong, ListObject orderToValueMap new MapView();Overridepublic boolean equals(Object o) {if (this o) {return true;}if (!(o instanceof LastValueWithRetractAccumulator)) {return false;}LastValueWithRetractAccumulator that (LastValueWithRetractAccumulator) o;return Objects.equals(lastValue, that.lastValue) Objects.equals(lastOrder, that.lastOrder) Objects.equals(retractList, that.retractList) valueToOrderMap.equals(that.valueToOrderMap) orderToValueMap.equals(that.orderToValueMap);}Overridepublic int hashCode() {return Objects.hash(lastValue, lastOrder, valueToOrderMap, orderToValueMap, retractList);}}getTypeInference() 是FunctionDefinition接口的方法而所有的用户自定义函数都实现了该接口我们只需要重新实现下该方法就可以以下是代码实现。
这里我们还需要用到工具类TypeInference这是Flink中的一个模块用于进行类型推断和类型推理。
可以看出我们在accumulatorTypeStrategy方法中传入了一个构建好的TypeStrategy这里我们将LastValueWithRetractAccumulator定义为了一个STRUCTURED不同的属性定义为具体的数据类型DataTypes工具类提供了很多丰富的对象形式还有万能的RAW类型。
public TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder().accumulatorTypeStrategy(callContext - {ListDataType dataTypes callContext.getArgumentDataTypes();DataType argDataType;if (dataTypes.get(0).getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {argDataType DataTypes.STRING();} elseargDataType DataTypeUtils.toInternalDataType(dataTypes.get(0));DataType accDataType DataTypes.STRUCTURED(LastValueWithRetractAccumulator.class,DataTypes.FIELD(lastValue, argDataType.nullable()),DataTypes.FIELD(lastOrder, DataTypes.BIGINT()),DataTypes.FIELD(retractList, DataTypes.ARRAY(DataTypes.STRUCTURED(Tuple2.class,DataTypes.FIELD(f0, argDataType.nullable()),DataTypes.FIELD(f1, DataTypes.BIGINT()))).bridgedTo(List.class)),DataTypes.FIELD(valueToOrderMap,MapView.newMapViewDataType(argDataType.nullable(),DataTypes.ARRAY(DataTypes.BIGINT()).bridgedTo(List.class))),//todo:blink 使用SortedMapView 优化性能开源使用MapView key天然字典升序倒序遍历性能可能不佳DataTypes.FIELD(orderToValueMap,MapView.newMapViewDataType(DataTypes.BIGINT(),DataTypes.ARRAY(argDataType.nullable()).bridgedTo(List.class))));return Optional.of(accDataType);}).build();
}指定outputType
这个也很简单直接上代码实现主要就是outputTypeStrategy中传入需要输出的数据类型即可。
Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder().outputTypeStrategy(callContext - {ListDataType dataTypes callContext.getArgumentDataTypes();DataType argDataType;if (dataTypes.get(0).getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {argDataType DataTypes.STRING();} elseargDataType DataTypeUtils.toInternalDataType(dataTypes.get(0));return Optional.of(argDataType);}).build();
}指定intputType
在此就不做介绍了同以上类似在inputTypeStrategy方法传入定义好的TypeStrategy就好。
根据inputType动态调整outType或者accumulatorType
在某些场景下我们需要让函数功能性更强比如当我输入是bigint类型的时候我输出bigint类型等类似的逻辑。
大家可以发现outputTypeStrategy或者 accumulatorTypeStrategy的入参都是 实现了 TypeStrategy接口的对象并且需要实现inferType方法。在Flink框架调用该方法的时候会传入一个上下文对象CallContext提供了获取函数入参类型的api getArgumentDataTypes()
代码实现这里的逻辑是将获取到的第一个入参对象的类型指定为输出对象的类型。
.outputTypeStrategy(callContext - {ListDataType dataTypes callContext.getArgumentDataTypes();DataType argDataType;if (dataTypes.get(0).getLogicalType().getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING)) {argDataType DataTypes.STRING();} elseargDataType DataTypeUtils.toInternalDataType(dataTypes.get(0));return Optional.of(argDataType);
}自定义DataType
可以发现以上分享几乎都是使用的DataTypes封装好的类型比如DataTypes.STRING()、DataTypes.Long()等。那如果我们需要封装一些其他对象如何操作呢上文提到DataTypes提供了一个自定义任意类型的方法。
/*** Data type of an arbitrary serialized type. This type is a black box within the table* ecosystem and is only deserialized at the edges.** pThe raw type is an extension to the SQL standard.** pThis method assumes that a {link TypeSerializer} instance is present. Use {link* #RAW(Class)} for automatically generating a serializer.** param clazz originating value class* param serializer type serializer* see RawType*/
public static T DataType RAW(ClassT clazz, TypeSerializerT serializer) {return new AtomicDataType(new RawType(clazz, serializer));
}我们有这样的一个场景需要在自定义的函数中使用bitmap计算UV值需要定义Roaring64Bitmap为accumulatorType直接上代码实现。
这里的Roaring64BitmapTypeSerializer已经在《自定义TypeSerializer》小段中实现有兴趣的同学可以往上翻翻。
public TypeInference getTypeInference(DataTypeFactory typeFactory) {return TypeInference.newBuilder().accumulatorTypeStrategy(callContext - {DataType type DataTypes.RAW(Roaring64Bitmap.class,Roaring64BitmapTypeSerializer.INSTANCE);return Optional.of(type);}).outputTypeStrategy(callContext - Optional.of(DataTypes.BIGINT())).build();
}四、结语
本文主要简单分享了一些自身对Flink类型及序列化的认识和应用实践能力有限不足之处欢迎指正。
引用 https://nightlies.apache.org/flink/flink-docs-release-1.13/
*文/ 木木
本文属得物技术原创更多精彩文章请看得物技术
未经得物技术许可严禁转载否则依法追究法律责任