当前位置: 首页 > news >正文

phpstudy配置网站四大门户网站对比分析

phpstudy配置网站,四大门户网站对比分析,友情网,怎么做移动网站吗一. Source 简介 DataStream是Flink的低级API#xff0c;用于进行数据的实时处理#xff0c;Flink编程模型分为Source、Transformation、Sink三个部分#xff0c;如下图所示。 默认Flink提供了大量的内置Source#xff0c;常见的Source如下#xff1a; 基于文件的Sour…一. Source 简介 DataStream是Flink的低级API用于进行数据的实时处理Flink编程模型分为Source、Transformation、Sink三个部分如下图所示。 默认Flink提供了大量的内置Source常见的Source如下 基于文件的Source基于Socket的Source基于集合的Source基于Kafka消息队列的Source 当以上内置Source不能满足业务需要时可以实现自定义Source。 Flink中有关Source的接口类的继承关系如下 SourceFunction单并行度Source的基类RichSourceFunction单并行度增强型Source的基类ParallelSourceFunction多并行度Source的基类RichParallelSourceFunction多并行度增强型Source的基类 二. 自定义单并行度Source 自定义单并行度的source需要实现SourceFunction接口。 代码实现 MySource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random;public class MySource implements SourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {// Num加上0~100的随机数生成一个字符串ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MySource());source.print();env.execute(source_demo);} }运行结果 5 Num: 62 6 Num: 91 7 Num: 13 8 Num: 53三. 自定义多并行度Source 自定义多并行度的source需要实现ParallelSourceFunction接口。 代码实现 MyParallelSource.java package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random;public class MyParallelSource implements ParallelSourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;} }SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MyParallelSource());source.print();env.execute(source_demo);} } 运行结果 7 Num: 43 8 Num: 30 1 Num: 92 2 Num: 50 5 Num: 39 6 Num: 6 4 Num: 20 3 Num: 2四. 自定义单并行度增强型Source 增强型Source额外提供了open和close方法可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。 在mysql中创建student表并插入三条数据。 create table student (id int primary key,name varchar(50),age int );insert into student values(1, name1, 20),(2, name2, 30), (3, name3, 15);实现代码 Student.java package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id id;this.name name;this.age age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return Student{ id id , name name \ , age age };} } MysqlSource.java package flink.basic.source;import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;public class MysqlSource extends RichSourceFunctionStudent {Connection conn;Statement stmt;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://192.168.47.130:3306/test;String user root;String password root;conn DriverManager.getConnection(url,user,password);stmt conn.createStatement();}Overridepublic void run(SourceContextStudent ctx) throws Exception {ResultSet rs stmt.executeQuery(select * from student);while (rs.next()) {int id rs.getInt(id);String name rs.getString(name);int age rs.getInt(age);ctx.collect(new Student(id, name, age));}rs.close();}Overridepublic void cancel() {}Overridepublic void close() throws Exception {stmt.close();conn.close();} } SourceDemo.java package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSourceStudent source env.addSource(new MysqlSource());source.print();env.execute(source_demo);} }运行结果 1 Student{id3, namename3, age15} 8 Student{id2, namename2, age30} 7 Student{id1, namename1, age20}
http://www.dnsts.com.cn/news/190909.html

相关文章:

  • 创建好网站如何把浏览器沈阳沈河seo网站排名优化
  • 合肥专业网站优化费用win7配置不能运行wordpress
  • 例点估算网站开发项目工作量wordpress 文章的形式
  • 做平面的就一定要做网站吗网站开发项目需求文档
  • 个人网站导航html源码丹阳房产网
  • 下列不属于网站建设规划通信技术公司网站建设
  • 池州网站网站建设做一网站APP多少钱
  • 广州海珠网站开发施工企业管理制度
  • 怎么用切片和dw做网站phpcms主题移植wordpress
  • 山东诚祥建设集团公司网站中大型企业网络组网案例
  • 课程网站建设ppt模板凡科做的网站百度收不收录
  • 深圳高端网站案例苏州seo网站推广
  • 做网站怎么选取关键词h5网站和响应式网站区别
  • 专业的网站建设服务商ftp地址格式怎么写
  • 网站建设费1万多入什么科目哪里有免费的网站模板
  • 珠海建站软件wordpress前端
  • 装饰网站建设软件下载加盟平台
  • 城阳网站开发ppt模板下载平台
  • 家居企业网站建设服务上海造价信息网
  • 江苏省建设信息网站管理平台360可以做网站
  • 企业做推广可以发哪些网站深圳论坛网站建设
  • 巴中做网站 微信开发高校健康驿站建设指引
  • 帮开设赌场的网站做美工wordpress 3.9.2 中文
  • 软件定制网站建设能看实物的地图软件
  • 搭建个人网站的两种方法企业网站制作模板免费下载
  • 深圳市门户网站建设哪家好网站开发 保证书
  • led网站建设方案模板公司网站管理
  • 帝国cms更改网站ico新华书店网站建设
  • 买个网约车多少钱啊如何优化网页加载速度
  • 东莞建站网站模板遵义网站制作一般需要多少钱