天津网站建设哪个好,asp 开发的大型网站,浙江众安建设集团有限公司网站,设计网站建设价格文章目录 出现场景#xff1a;表现#xff1a;问题#xff1a;解决#xff1a; tombstone : Kafka中提供了一个墓碑消息#xff08;tombstone#xff09;的概念#xff0c;如果一条消息的key不为null#xff0c;但是其value为null#xff0c;那么此消息就是墓碑消息. … 文章目录 出现场景表现问题解决 tombstone : Kafka中提供了一个墓碑消息tombstone的概念如果一条消息的key不为null但是其value为null那么此消息就是墓碑消息. 出现场景
双流join时采用的是left join的方式众所周知该方式会产生回撤流下游kafka连接器使用的是upsert-kafka在产生回撤流时kafka会删除未join上的消息填充join后的消息进去。
表现 问题
此时消费该topic的flink程序会出现空指针异常 DataStream Api会出现Table Api 未发现 解决
自定义kafka反序列化器过滤Null值flink1.14.4 代码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers().setTopics(test).setGroupId(gid).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new MySimpleStringSchema()).setProperty(auto.offset.commit, false).build();DataStreamSourceString kfkDs env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kfk);kfkDs.print();env.execute();}// 自定义反序列化器static class MySimpleStringSchema implements DeserializationSchemaString, SerializationSchemaString{Overridepublic String deserialize(byte[] message) {if (message ! null) return new String(message, StandardCharsets.UTF_8);else{return deserialize(new byte[1]); // 返回空 不是Null}}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic byte[] serialize(String element) {return element.getBytes(StandardCharsets.UTF_8);}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}