当前位置: 首页 > news >正文

phpstudy配置网站品牌推广途径

phpstudy配置网站,品牌推广途径,如何在手机上自己制作软件,销售培训课程一. Source 简介 DataStream是Flink的低级API#xff0c;用于进行数据的实时处理#xff0c;Flink编程模型分为Source、Transformation、Sink三个部分#xff0c;如下图所示。 默认Flink提供了大量的内置Source#xff0c;常见的Source如下#xff1a; 基于文件的Sour…一. Source 简介 DataStream是Flink的低级API用于进行数据的实时处理Flink编程模型分为Source、Transformation、Sink三个部分如下图所示。 默认Flink提供了大量的内置Source常见的Source如下 基于文件的Source基于Socket的Source基于集合的Source基于Kafka消息队列的Source 当以上内置Source不能满足业务需要时可以实现自定义Source。 Flink中有关Source的接口类的继承关系如下 SourceFunction单并行度Source的基类RichSourceFunction单并行度增强型Source的基类ParallelSourceFunction多并行度Source的基类RichParallelSourceFunction多并行度增强型Source的基类 二. 自定义单并行度Source 自定义单并行度的source需要实现SourceFunction接口。 代码实现 MySource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random;public class MySource implements SourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {// Num加上0~100的随机数生成一个字符串ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MySource());source.print();env.execute(source_demo);} }运行结果 5 Num: 62 6 Num: 91 7 Num: 13 8 Num: 53三. 自定义多并行度Source 自定义多并行度的source需要实现ParallelSourceFunction接口。 代码实现 MyParallelSource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random;public class MyParallelSource implements ParallelSourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MyParallelSource());source.print();env.execute(source_demo);} } 运行结果 7 Num: 43 8 Num: 30 1 Num: 92 2 Num: 50 5 Num: 39 6 Num: 6 4 Num: 20 3 Num: 2四. 自定义单并行度增强型Source 增强型Source额外提供了open和close方法可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。 在mysql中创建student表并插入三条数据。 create table student (id int primary key,name varchar(50),age int );insert into student values(1, name1, 20),(2, name2, 30), (3, name3, 15);实现代码 Student.java package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id id;this.name name;this.age age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return Student{ id id , name name \ , age age };} } MysqlSource.java package flink.basic.source;import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;public class MysqlSource extends RichSourceFunctionStudent {Connection conn;Statement stmt;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://192.168.47.130:3306/test;String user root;String password root;conn DriverManager.getConnection(url,user,password);stmt conn.createStatement();}Overridepublic void run(SourceContextStudent ctx) throws Exception {ResultSet rs stmt.executeQuery(select * from student);while (rs.next()) {int id rs.getInt(id);String name rs.getString(name);int age rs.getInt(age);ctx.collect(new Student(id, name, age));}rs.close();}Overridepublic void cancel() {}Overridepublic void close() throws Exception {stmt.close();conn.close();} } SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSourceStudent source env.addSource(new MysqlSource());source.print();env.execute(source_demo);} }运行结果 1 Student{id3, namename3, age15} 8 Student{id2, namename2, age30} 7 Student{id1, namename1, age20}
http://www.dnsts.com.cn/news/41774.html

相关文章:

  • 男女做暧昧视频网站2023还能上的网站
  • php 可以自己做网站吗莆田外贸专业建站
  • 创业型企业网站模板阳江房产网上半年海怡新
  • 做邀请函的网站搜索引擎优化员简历
  • 龙华网站建设销售员中国海关数据查询平台
  • 大型网站建设价格多少查询百度关键词排名
  • 注塑模具东莞网站建设租电信服务器开网站
  • 十堰市有几家网站公司如何把字体导入wordpress
  • 购物网站的推广网站首页设计报告
  • 国外做贸易网站装修行业网站建设
  • 湖北省疾病预防控制中心云南网站优化公司
  • 织梦网站百度推送加哪濮阳网络运输证
  • 做网站有哪些注意事项网站建设价格标准
  • 用ps做网站首页顶部图片做App和网站 聚马
  • 有做企业网站的吗教育网站制作哪专业
  • 网站策划预算怎么做广告策划书封面
  • 晓风彩票网站建设源代码授权wordpress total主题
  • 桐庐建设局网站网站的建设域名空间
  • crm系统 网站建设关于申请建设门户网站的
  • 网站后台管理模板免费下载在线crm系统价格
  • 射击游戏网页版seo基础篇
  • 一般网站服务器配置广宁网站建设
  • 网站制作需要多少钱官网wordpress如何屏蔽特定国家的用户
  • 做 个收废品网站漫威网页制作教程
  • 外汇直播室都是网站做创网站 设计方案
  • 网络彩票的网站怎么做搜索大全搜索引擎
  • 东莞建站网站建设产品推广wordpress建两个网站吗
  • 个人政务公开网站建设工作总结宝塔搭建wordpress访问很慢
  • 杭州公司的网站建设公司做网站后有人抢注关键词
  • 国外做油画的网站广州h5设计网站公司