当前位置: 首页 > news >正文

phpstudy配置网站网站开发需要什么东西

phpstudy配置网站,网站开发需要什么东西,做教程的网站内容怎么找,网站套餐到期什么意思一. Source 简介 DataStream是Flink的低级API#xff0c;用于进行数据的实时处理#xff0c;Flink编程模型分为Source、Transformation、Sink三个部分#xff0c;如下图所示。 默认Flink提供了大量的内置Source#xff0c;常见的Source如下#xff1a; 基于文件的Sour…一. Source 简介 DataStream是Flink的低级API用于进行数据的实时处理Flink编程模型分为Source、Transformation、Sink三个部分如下图所示。 默认Flink提供了大量的内置Source常见的Source如下 基于文件的Source基于Socket的Source基于集合的Source基于Kafka消息队列的Source 当以上内置Source不能满足业务需要时可以实现自定义Source。 Flink中有关Source的接口类的继承关系如下 SourceFunction单并行度Source的基类RichSourceFunction单并行度增强型Source的基类ParallelSourceFunction多并行度Source的基类RichParallelSourceFunction多并行度增强型Source的基类 二. 自定义单并行度Source 自定义单并行度的source需要实现SourceFunction接口。 代码实现 MySource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random;public class MySource implements SourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {// Num加上0~100的随机数生成一个字符串ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MySource());source.print();env.execute(source_demo);} }运行结果 5 Num: 62 6 Num: 91 7 Num: 13 8 Num: 53三. 自定义多并行度Source 自定义多并行度的source需要实现ParallelSourceFunction接口。 代码实现 MyParallelSource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random;public class MyParallelSource implements ParallelSourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MyParallelSource());source.print();env.execute(source_demo);} } 运行结果 7 Num: 43 8 Num: 30 1 Num: 92 2 Num: 50 5 Num: 39 6 Num: 6 4 Num: 20 3 Num: 2四. 自定义单并行度增强型Source 增强型Source额外提供了open和close方法可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。 在mysql中创建student表并插入三条数据。 create table student (id int primary key,name varchar(50),age int );insert into student values(1, name1, 20),(2, name2, 30), (3, name3, 15);实现代码 Student.java package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id id;this.name name;this.age age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return Student{ id id , name name \ , age age };} } MysqlSource.java package flink.basic.source;import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;public class MysqlSource extends RichSourceFunctionStudent {Connection conn;Statement stmt;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://192.168.47.130:3306/test;String user root;String password root;conn DriverManager.getConnection(url,user,password);stmt conn.createStatement();}Overridepublic void run(SourceContextStudent ctx) throws Exception {ResultSet rs stmt.executeQuery(select * from student);while (rs.next()) {int id rs.getInt(id);String name rs.getString(name);int age rs.getInt(age);ctx.collect(new Student(id, name, age));}rs.close();}Overridepublic void cancel() {}Overridepublic void close() throws Exception {stmt.close();conn.close();} } SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSourceStudent source env.addSource(new MysqlSource());source.print();env.execute(source_demo);} }运行结果 1 Student{id3, namename3, age15} 8 Student{id2, namename2, age30} 7 Student{id1, namename1, age20}
http://www.dnsts.com.cn/news/9218.html

相关文章:

  • 网站改版 合同一女被多男做的视频网站
  • 万网注册域名就可以做网站吗wordpress 顶部导航
  • php制作网站关键词点击工具
  • 网站续费要多少钱aspnet网站开发选择题
  • 茶叶网站模板下载图片制作教程
  • 黄石做网站要多少钱成都网站开发工资
  • 公司网站建设功能介绍网站建设售后服务承诺书
  • 奢侈品网站怎么做tuig优化人才网招聘官方网
  • 外贸在哪些网站做世界互联网峰会马云
  • 网站制作公司 云南装修设计公司公司价格表
  • 网站开发的选题依据能做SEO优化的网站建设
  • 许昌企业网站建设建设网站的各种问题
  • 福建建设执业注册管理中心网站推广顾问
  • 网站排名推广自己怎么做wordpress资源源码
  • 网站建设和管理什么意思个人简历模板在线编辑免费
  • 网站推广方法渠道男科医院收费一览表
  • 哪个软件可以做明星视频网站公司起名字大全免费打分
  • 网站建设中 html5做文具的网站
  • 济南手机网站网店设计公司
  • 来凡网站建设公司网站文章优化流程方案
  • 网站设计的国际专业流程长春火车站最新防疫要求
  • 孝感市建设网站做蛋糕哪个教程网站好
  • 新网站seo技术龙华附近网站建设公司
  • 界面设计优秀的网站有哪些网站课程建设申报书
  • 搭建网站多少时间电子商务网站建设实用教程教案
  • 网站开发公司云鲸互创怎么联系网页超链接怎么做
  • 郑州建网站需要多少钱去西安旅游最佳路线
  • 辛集建设网站永州做网站公司
  • 最好用的免费建站平台营销型网站建设页面
  • 做电影解析网站阿里云域名备案网站建设方案书