当前位置: 首页 > news >正文

搭建网站用什么语言域名维护一个年多少钱

搭建网站用什么语言,域名维护一个年多少钱,小程序问答库,数据库网站有哪些引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合#xff0c;它比ProcessFunction拥有更多特性#xff0c;例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解#xff0c;KeyedProc…引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合它比ProcessFunction拥有更多特性例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解KeyedProcessFunction的注解如下 /*** A keyed function that processes elements of a stream.** pFor every element in the input stream {link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {link Context}. For firing timers {link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** pbNOTE:/b Access to keyed state and timers (which are also scoped to a key) is only* available if the {code KeyedProcessFunction} is applied on a {code KeyedStream}.** pbNOTE:/b A {code KeyedProcessFunction} is always a {link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {link org.apache.flink.api.common.functions.RichFunction#close()}.*/上面简单来说就是以下四点 Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用通过这个方法的Context参数可以设置定时器在开启定时器后会程序会定时调用onTimer方法由于KeyedProcessFunction实现了RichFunction接口因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放需要注意的是只有在KeyedStream里才能够访问state和定时器通俗点来说就是这个函数要用在keyBy这个函数的后面 processElement方法解析 Flink会调用processElement方法处理输入流中的每一条数据KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据 onTimer方法解析在启用TimerService服务时会定时触发此方法一般会在processElement方法中开启TimerService服务 以上就是这个函数的基本知识接下来就通过实战来熟悉下它的使用 实战简介 本次实战的目标是学习KeyedProcessFunction内容如下 监听本机7777端口读取字符串将每个字符串用空格分隔转成Tuple2实例f0是分隔后的单词f1等于1将Tuple2实例集合通过f0字段分区得到KeyedStreamKeyedSteam通过自定义KeyedProcessFunction处理自定义KeyedProcessFunction的作用是记录每个单词最新一次出现的时间然后建一个十秒的定时器进行触发 使用代码例子 首先定义pojo类 public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return count;}public String getKey() {return key;}public void setKey(String key) {this.key key;}public long getCount() {return count;}public void setCount(long count) {this.count count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp lastQuestTimestamp;} }接着实现KeyedProcessFunction类 public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunctionTuple, Tuple2String, Integer, Tuple2String, Long {private ValueStateCountWithTimestampNew state;Overridepublic void open(Configuration parameters) throws Exception {state getRuntimeContext().getState(new ValueStateDescriptorCountWithTimestampNew(sherlock-state, CountWithTimestampNew.class));}// 实现数据处理逻辑的地方Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();if (countWithTimestampNew null) {countWithTimestampNew new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗推测state对象是跟key绑定针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器十秒后触发long timer countWithTimestampNew.getLastQuestTimestamp()10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息用于确保数据准确性System.out.println(String.format( 触发processElement方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout false;if (timestamp countWithTimestampNew.getLastQuestTimestamp()10000 ) {//out.collect(new Tuple2(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout true;}//打印所有信息用于确保数据准确性System.out.println(String.format( 触发onTimer方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));} }最后是启动类 public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口读取字符串DataStreamString socketDataStream env.socketTextStream(localhost, 7777);// 所有输入的单词如果超过10秒没有再次出现都可以通过CountWithTimeoutFunction得到DataStreamTuple2String, Long timeOutWord socketDataStream// 对收到的字符串用空格做分割得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词如果超过10秒没有再次出现就在此打印出来timeOutWord.print();env.execute(ProcessFunction demo : KeyedProcessFunction);} }演示 在启动服务前先通过linux指令监听端口 nc -lk 7777 启动Flink服务后往7777端口里面发送数据 通过IDEA的终端可以看到有日志输出可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志 那么咱们尝试连续发送两条Hello呢可以看到累加器会持续累加并且会触发两次onTimer方法也就是每一条消息都会触发一次。由于连续发送两条因此可以看得到第三行日志的末尾是false说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来 再输入点其他的试试 通过输出可以看到这些单词的计数器又从0开始说明每一个Key都对应一个状态 思考题 open方法会在哪里进行调用KeyedProcessFunction整个类的完整调用逻辑是怎么样的registerProcessingTimeTimer和registerEventTimeTimer的差异是什么 参考资料 https://blog.csdn.net/boling_cavalry/article/details/106299167https://blog.csdn.net/lujisen/article/details/105510532https://blog.csdn.net/qq_31866793/article/details/102831731
http://www.dnsts.com.cn/news/134380.html

相关文章:

  • 钦州市建设工程质量监督站网站唐山公司网站建设
  • 大连建立网站公司wordpress淘宝客手机
  • 韵达快递小网站怎么做信息流优化师招聘
  • 微信网站建设流程网页编辑器在线使用
  • 网站建设行业企业发展前景wordpress看访问量
  • 客栈网站建设让你做一个旅游网站你会怎么做
  • 浙江省建设通网站网站源代码使用
  • 足球网站界面设计网站建设氺首选金手指13
  • 网站最下面版权模板广州安全教育平台官网
  • 微信彩票网站网站建设湖南建设厅网站
  • 手机网站定制方案做展示类网站
  • 网站开发工程师需要什么证书全国信息企业公示网官网查询
  • 天津市工程建设项目报建网站seo搜索引擎优化
  • 专业网站设计wordpress plugin开发
  • 上海网站建设褐公洲司vps服务器购买
  • 发布一个网站要多少钱小学生制作ppt的软件
  • 盐城市亭湖区建设局网站国外ip代理app
  • 一级域名的网站怎么做免费制作
  • 温建设文件发布在哪个网站风行ppt模板网
  • 网站备案后 还是需要再备案吗怎样查看一个网站的域名
  • 网站如何做背景音乐中国腾讯和联通
  • 东莞网站建设外贸外贸网站虚拟空间
  • 商城网站系flash网站模板源码
  • 北京网站建设公司哪家实惠用路由侠做网站
  • 网站里面的链接怎么做的一个空间怎么做两个网站 跳转
  • 织梦系统做网站如何做网站的版块规划
  • 自已如何做网站05网寒假作业答案
  • 手机网站与微信结合没有备案的网站怎么做淘宝客
  • 网站扒皮下载后怎么做西安h5响应式网站
  • 香河家具城网站建设目标一家专门做海报的网站