阿里云服务器建网站,福州市市政建设开发有限公司网站,永远免费的域名,清河网站建设公司1.为啥不使用cep呢#xff0c;cep的超时时间设置不好配置化#xff0c;无法满足扩展要求
2.超时怎么界定。A事件发生后#xff0c;过了N时间#xff0c;还没有收到B事件#xff0c;算超时。
代码如下#xff1a; import com.alibaba.fastjson.JSONObject;
import lombo…1.为啥不使用cep呢cep的超时时间设置不好配置化无法满足扩展要求
2.超时怎么界定。A事件发生后过了N时间还没有收到B事件算超时。
代码如下 import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunctionString, JSONObject, JSONObject {private static final long serialVersionUID -61608451659272532L;private transient ValueStateLong firstDataTime;private transient ValueStateLong secondDataTime;private transient ValueStateString eventType;Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptorLong firstDataDescriptor new ValueStateDescriptor(firstDataTime, Long.class);firstDataTime getRuntimeContext().getState(firstDataDescriptor);ValueStateDescriptorLong secondDataDescriptor new ValueStateDescriptor(secondDataTime, Long.class);secondDataTime getRuntimeContext().getState(secondDataDescriptor);ValueStateDescriptorString eventTypeDescriptor new ValueStateDescriptor(eventType, String.class);eventType getRuntimeContext().getState(eventTypeDescriptor);}Overridepublic void processElement(JSONObject value, KeyedProcessFunctionString, JSONObject, JSONObject.Context ctx, CollectorJSONObject out) throws Exception {Long currentTimestamp value.getLong(ts);if (value.containsKey(timeout)) {//异步请求消息long timeout value.getLong(timeout);firstDataTime.update(currentTimestamp timeout);eventType.update(value.getString(event));ctx.timerService().registerProcessingTimeTimer(currentTimestamp timeout);} else {secondDataTime.update(currentTimestamp);}}Overridepublic void onTimer(long timestamp, KeyedProcessFunctionString, JSONObject, JSONObject.OnTimerContext ctx, CollectorJSONObject out) throws Exception {Long firstTime firstDataTime.value();Long lastTime secondDataTime.value();if (lastTime null || (firstTime ! null lastTime firstTime)) {//超时了log.info(AsyncModelTimeoutHandler onTimer handle triggerTime{}, firstTime{}, secondTime{},key{}, timestamp, firstTime, lastTime, ctx.getCurrentKey());JSONObject r new JSONObject();r.put(id, ctx.getCurrentKey());r.put(judgeTime, timestamp);r.put(event, eventType.value());out.collect(r);}firstDataTime.clear();secondDataTime.clear();eventType.clear();}
}