在线生成电子印章,张家口建站优化,网站开发绪论,网站空间邮箱背景
在处理键值分区状态时#xff0c;使用ttl设置过期时间是我们经常使用的#xff0c;但是任何代码的修改都需要首先进行单元测试#xff0c;本文就使用单元测试来验证一下状态ttl的设置是否正确
测试状态ttl超时的单元测试
首先看一下处理函数#xff1a;
// 处理函…背景
在处理键值分区状态时使用ttl设置过期时间是我们经常使用的但是任何代码的修改都需要首先进行单元测试本文就使用单元测试来验证一下状态ttl的设置是否正确
测试状态ttl超时的单元测试
首先看一下处理函数
// 处理函数
public class MyStateProcessFunction extends KeyedProcessFunctionString, String, String {// 键值分区状态ValueStateString previousInput;Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor new ValueStateDescriptorString(previousInput, Types.STRING);// 状态ttl超时时间设置StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(10, false).build();stateDescriptor.enableTimeToLive(ttlConfig);previousInput getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(String in, Context context, CollectorString collector) throws Exception {context.timerService().registerProcessingTimeTimer(100);String out (Objects.nonNull(previousInput.value()) ? previousInput.value() : ) in;collector.collect(out);if (!in.contains(NotUpdate)) {// 为了模仿有访问状态但是不更新状态正常情况下业务逻辑是访问其他key组的其它state而一直没有访问的key的状态会在超时时间到之后被清理掉previousInput.update(in);}}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {if (Objects.nonNull(previousInput.value())) {out.collect(String.format(timer trigger %s, previousInput.value()));} else {out.collect(String.format(timer trigger state clear, previousInput.value()));}}}单元测试代码:
/*** 测试状态处理函数包含状态的ttl配置,以及ontimer方法**/
Test
public void testKeyedStateProcessFunction() throws Exception {MyStateProcessFunction myStateProcessFunction new MyStateProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x - 1, Types.STRING);testHarness.open();testHarness.processElement(hello, 10);// 注册了一个定时器,定时器100后过期Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// 测试输出Assert.assertEquals(Lists.newArrayList(hello), testHarness.extractOutputValues());ValueStateString previousInput myStateProcessFunction.getRuntimeContext().getState(new ValueStateDescriptor(previousInput, Types.STRING));// 查看下状态应该已经被设置Assert.assertEquals(hello, previousInput.value());testHarness.processElement(world, 10);// 再次测试输出Assert.assertEquals(Lists.newArrayList(hello, helloworld), testHarness.extractOutputValues());// 再次查看下状态应该已经被设置Assert.assertEquals(world, previousInput.value());// 设置时间为1分钟,让状态超时testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());// 触发下状态访问,这样flink就会清理正常生产中不需要这一步访问状态本来就一直在进行中只是可能是其他key分组的状态testHarness.processElement(NotUpdate1, System.currentTimeMillis());// 查看下状态应该已经被清理Assert.assertNull(previousInput.value());// 设置让定时器过期,顺带确认下状态已经被清理testHarness.setProcessingTime(100);// 测试输出(包含两个输入一个定时器的输出)Assert.assertEquals(Lists.newArrayList(hello, helloworld, NotUpdate1, timer trigger state clear),testHarness.extractOutputValues());testHarness.close();
}测试代码中已经包含了详细的注解我们实现自己的ttl单元测试时可以参考下