深圳好的网站制作哪家快,嘉祥做网站,长沙网站建站模板,东莞工厂网站建设背景
在flink中#xff0c;我们需要对我们写的map转换函数#xff0c;process处理函数进行单元测试#xff0c;测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新#xff0c;本文就记录几个测试过程中的要点
flink中测试函数
首先我们根据我们要测…背景
在flink中我们需要对我们写的map转换函数process处理函数进行单元测试测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新本文就记录几个测试过程中的要点
flink中测试函数
首先我们根据我们要测试的是数据流的类型选择不同的测试套件如下所示
OneInputStreamOperatorTestHarness适用于 DataStreams 数据流KeyedOneInputStreamOperatorTestHarness适用于 KeyedStreams 分组后的数据流TwoInputStreamOperatorTestHarness适用于两个数据流DataStream的 ConnectedStreamKeyedTwoInputStreamOperatorTestHarness适用于两个 KeyedStream 的 ConnectedStream
其次根据是测试map函数还是process函数我们选择不同的操作符如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子
map函数测试代码:
Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarnessString, String testHarness // KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarnessString, String, String(new StreamFlatMap(statefulFlatMap),x - 1, Types.STRING);testHarness.open();// test first recordtestHarness.processElement(world, 10);ValueStateString previousInput statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor(previousInput, Types.STRING));String stateValue previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals(world, stateValue);// test second recordtestHarness.processElement(parallel, 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10),new StreamRecord(hello parallel world, 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals(parallel, previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunctionString, String {ValueStateString previousInput;Overridepublic void open(Configuration parameters) throws Exception {previousInput getRuntimeContext().getState(new ValueStateDescriptorString(previousInput, Types.STRING));}Overridepublic void flatMap(String in, CollectorString collector) throws Exception {String out hello in;if(previousInput.value() ! null){out out previousInput.value();}previousInput.update(in);collector.collect(out);}
}process处理函数代码:
Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction new MyProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x - 1, Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement(world, 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10)),testHarness.extractOutputStreamRecords());}Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction new MyProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x - 1, Types.STRING);testHarness.open();testHarness.processElement(world, 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10),new StreamRecord(Timer triggered at timestamp 50)),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunctionString, String, String {Overridepublic void processElement(String in, Context context, CollectorString collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out hello in;collector.collect(out);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {out.collect(String.format(Timer triggered at timestamp %d, timestamp));}}此外附加官方的map函数的测试代码
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {link StreamMap}. These test that:** ul* liRichFunction methods are called correctly* liTimestamps of processed elements match the input timestamp* liWatermarks are correctly forwarded* /ul*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunctionInteger, Integer {private static final long serialVersionUID 1L;Overridepublic void flatMap(Integer value, CollectorInteger out) throws Exception {if (value % 2 0) {out.collect(value);out.collect(value * value);}}}Testpublic void testFlatMap() throws Exception {StreamFlatMapInteger, Integer operator new StreamFlatMapInteger, Integer(new MyFlatMap());OneInputStreamOperatorTestHarnessInteger, Integer testHarness new OneInputStreamOperatorTestHarnessInteger, Integer(operator);long initialTime 0L;ConcurrentLinkedQueueObject expectedOutput new ConcurrentLinkedQueueObject();testHarness.open();testHarness.processElement(new StreamRecordInteger(1, initialTime 1));testHarness.processElement(new StreamRecordInteger(2, initialTime 2));testHarness.processWatermark(new Watermark(initialTime 2));testHarness.processElement(new StreamRecordInteger(3, initialTime 3));testHarness.processElement(new StreamRecordInteger(4, initialTime 4));testHarness.processElement(new StreamRecordInteger(5, initialTime 5));testHarness.processElement(new StreamRecordInteger(6, initialTime 6));testHarness.processElement(new StreamRecordInteger(7, initialTime 7));testHarness.processElement(new StreamRecordInteger(8, initialTime 8));expectedOutput.add(new StreamRecordInteger(2, initialTime 2));expectedOutput.add(new StreamRecordInteger(4, initialTime 2));expectedOutput.add(new Watermark(initialTime 2));expectedOutput.add(new StreamRecordInteger(4, initialTime 4));expectedOutput.add(new StreamRecordInteger(16, initialTime 4));expectedOutput.add(new StreamRecordInteger(6, initialTime 6));expectedOutput.add(new StreamRecordInteger(36, initialTime 6));expectedOutput.add(new StreamRecordInteger(8, initialTime 8));expectedOutput.add(new StreamRecordInteger(64, initialTime 8));TestHarnessUtil.assertOutputEquals(Output was not correct., expectedOutput, testHarness.getOutput());}Testpublic void testOpenClose() throws Exception {StreamFlatMapString, String operator new StreamFlatMapString, String(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarnessString, String testHarness new OneInputStreamOperatorTestHarnessString, String(operator);long initialTime 0L;testHarness.open();testHarness.processElement(new StreamRecordString(Hello, initialTime));testHarness.close();Assert.assertTrue(RichFunction methods where not called., TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue(Output contains no elements., testHarness.getOutput().size() 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunctionString, String {private static final long serialVersionUID 1L;public static boolean openCalled false;public static boolean closeCalled false;Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail(Close called before open.);}openCalled true;}Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail(Open was not called before close.);}closeCalled true;}Overridepublic void flatMap(String value, CollectorString out) throws Exception {if (!openCalled) {Assert.fail(Open was not called before run.);}out.collect(value);}}
}
包含同时测试FlatMap和RichFlatMap函数但是其中没有操作状态我前面的例子包含了RichFlatMap状态的测试
参考文献 https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/