当前位置: 首页 > news >正文

深圳好的网站制作哪家快嘉祥做网站

深圳好的网站制作哪家快,嘉祥做网站,长沙网站建站模板,东莞工厂网站建设背景 在flink中#xff0c;我们需要对我们写的map转换函数#xff0c;process处理函数进行单元测试#xff0c;测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新#xff0c;本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…背景 在flink中我们需要对我们写的map转换函数process处理函数进行单元测试测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测试的是数据流的类型选择不同的测试套件如下所示 OneInputStreamOperatorTestHarness适用于 DataStreams 数据流KeyedOneInputStreamOperatorTestHarness适用于 KeyedStreams 分组后的数据流TwoInputStreamOperatorTestHarness适用于两个数据流DataStream的 ConnectedStreamKeyedTwoInputStreamOperatorTestHarness适用于两个 KeyedStream 的 ConnectedStream 其次根据是测试map函数还是process函数我们选择不同的操作符如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子 map函数测试代码: Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarnessString, String testHarness // KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarnessString, String, String(new StreamFlatMap(statefulFlatMap),x - 1, Types.STRING);testHarness.open();// test first recordtestHarness.processElement(world, 10);ValueStateString previousInput statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor(previousInput, Types.STRING));String stateValue previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals(world, stateValue);// test second recordtestHarness.processElement(parallel, 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10),new StreamRecord(hello parallel world, 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals(parallel, previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunctionString, String {ValueStateString previousInput;Overridepublic void open(Configuration parameters) throws Exception {previousInput getRuntimeContext().getState(new ValueStateDescriptorString(previousInput, Types.STRING));}Overridepublic void flatMap(String in, CollectorString collector) throws Exception {String out hello in;if(previousInput.value() ! null){out out previousInput.value();}previousInput.update(in);collector.collect(out);} }process处理函数代码: Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction new MyProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x - 1, Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement(world, 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10)),testHarness.extractOutputStreamRecords());}Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction new MyProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x - 1, Types.STRING);testHarness.open();testHarness.processElement(world, 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10),new StreamRecord(Timer triggered at timestamp 50)),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunctionString, String, String {Overridepublic void processElement(String in, Context context, CollectorString collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out hello in;collector.collect(out);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {out.collect(String.format(Timer triggered at timestamp %d, timestamp));}}此外附加官方的map函数的测试代码 /** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector;import org.junit.Assert; import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {link StreamMap}. These test that:** ul* liRichFunction methods are called correctly* liTimestamps of processed elements match the input timestamp* liWatermarks are correctly forwarded* /ul*/ public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunctionInteger, Integer {private static final long serialVersionUID 1L;Overridepublic void flatMap(Integer value, CollectorInteger out) throws Exception {if (value % 2 0) {out.collect(value);out.collect(value * value);}}}Testpublic void testFlatMap() throws Exception {StreamFlatMapInteger, Integer operator new StreamFlatMapInteger, Integer(new MyFlatMap());OneInputStreamOperatorTestHarnessInteger, Integer testHarness new OneInputStreamOperatorTestHarnessInteger, Integer(operator);long initialTime 0L;ConcurrentLinkedQueueObject expectedOutput new ConcurrentLinkedQueueObject();testHarness.open();testHarness.processElement(new StreamRecordInteger(1, initialTime 1));testHarness.processElement(new StreamRecordInteger(2, initialTime 2));testHarness.processWatermark(new Watermark(initialTime 2));testHarness.processElement(new StreamRecordInteger(3, initialTime 3));testHarness.processElement(new StreamRecordInteger(4, initialTime 4));testHarness.processElement(new StreamRecordInteger(5, initialTime 5));testHarness.processElement(new StreamRecordInteger(6, initialTime 6));testHarness.processElement(new StreamRecordInteger(7, initialTime 7));testHarness.processElement(new StreamRecordInteger(8, initialTime 8));expectedOutput.add(new StreamRecordInteger(2, initialTime 2));expectedOutput.add(new StreamRecordInteger(4, initialTime 2));expectedOutput.add(new Watermark(initialTime 2));expectedOutput.add(new StreamRecordInteger(4, initialTime 4));expectedOutput.add(new StreamRecordInteger(16, initialTime 4));expectedOutput.add(new StreamRecordInteger(6, initialTime 6));expectedOutput.add(new StreamRecordInteger(36, initialTime 6));expectedOutput.add(new StreamRecordInteger(8, initialTime 8));expectedOutput.add(new StreamRecordInteger(64, initialTime 8));TestHarnessUtil.assertOutputEquals(Output was not correct., expectedOutput, testHarness.getOutput());}Testpublic void testOpenClose() throws Exception {StreamFlatMapString, String operator new StreamFlatMapString, String(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarnessString, String testHarness new OneInputStreamOperatorTestHarnessString, String(operator);long initialTime 0L;testHarness.open();testHarness.processElement(new StreamRecordString(Hello, initialTime));testHarness.close();Assert.assertTrue(RichFunction methods where not called., TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue(Output contains no elements., testHarness.getOutput().size() 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunctionString, String {private static final long serialVersionUID 1L;public static boolean openCalled false;public static boolean closeCalled false;Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail(Close called before open.);}openCalled true;}Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail(Open was not called before close.);}closeCalled true;}Overridepublic void flatMap(String value, CollectorString out) throws Exception {if (!openCalled) {Assert.fail(Open was not called before run.);}out.collect(value);}} } 包含同时测试FlatMap和RichFlatMap函数但是其中没有操作状态我前面的例子包含了RichFlatMap状态的测试 参考文献 https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/
http://www.dnsts.com.cn/news/128776.html

相关文章:

  • 笔记本怎么建设网站wordpress seo设置
  • 石景山老山网站建设freeserver 免费服务器申请
  • 淄博市建设工程质量协会网站中国十大劳务派遣公司
  • 广东智能网站建设费用北京学校网站建设公司
  • 饲料公司网站建设方案免费建站哪个比较好
  • 淘宝网站c 设计怎么做旅行社网站建设规划的内容
  • 广州制作网站公司哪家好wordpress主机教程
  • 北京网站备案查询python 采集 wordpress
  • 阜阳网站制作公司多少钱建设网站天河区
  • 如何更改网站的关键词珠海网站建设q479185700强涵
  • 南阳网站排名公司中国志愿者服务网站登录注册
  • 学科建设网站桂建云平台注册
  • 枣庄手机网站开发公司专业网站建设加工
  • 网站后台管理维护 不懂编程如何网站数据备份
  • 合肥网站建设策划方案企业为什么要年检
  • 网站静态生成目录 名称 建议装修推荐平台
  • 欢迎访问建设银行网站海南响应式网页建设方案
  • 毕业设计做网站教程自助网站免费建站平台
  • 苏州网站提升排名注册商标查询网
  • 网站改版解决方案wordpress可以承受多大数据
  • 三亚网站定制开发公司网站建设的十点优势
  • 北京制作小程序深圳网络优化
  • 网站推广公司网站wordpress主题开发出
  • 石家庄网站建设人员设计素材网站排名
  • 了解网站的建设心得开发定制网站公司
  • app软件商店做网站优化步骤
  • 优秀网站的链接wordpress标签云不显示图片
  • 网站欢迎屏怎么做在别的公司做的网站
  • seo推广公司网站模板wordpress改页脚
  • 钦州市建设工程质量监督站网站页面设计稿