phpstudy配置网站,网站开发需要什么东西,做教程的网站内容怎么找,网站套餐到期什么意思一. Source 简介
DataStream是Flink的低级API#xff0c;用于进行数据的实时处理#xff0c;Flink编程模型分为Source、Transformation、Sink三个部分#xff0c;如下图所示。
默认Flink提供了大量的内置Source#xff0c;常见的Source如下#xff1a;
基于文件的Sour…一. Source 简介
DataStream是Flink的低级API用于进行数据的实时处理Flink编程模型分为Source、Transformation、Sink三个部分如下图所示。
默认Flink提供了大量的内置Source常见的Source如下
基于文件的Source基于Socket的Source基于集合的Source基于Kafka消息队列的Source
当以上内置Source不能满足业务需要时可以实现自定义Source。
Flink中有关Source的接口类的继承关系如下
SourceFunction单并行度Source的基类RichSourceFunction单并行度增强型Source的基类ParallelSourceFunction多并行度Source的基类RichParallelSourceFunction多并行度增强型Source的基类
二. 自定义单并行度Source
自定义单并行度的source需要实现SourceFunction接口。
代码实现
MySource.java
package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class MySource implements SourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {// Num加上0~100的随机数生成一个字符串ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;}
}SourceDemo.java
package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MySource());source.print();env.execute(source_demo);}
}运行结果
5 Num: 62
6 Num: 91
7 Num: 13
8 Num: 53三. 自定义多并行度Source
自定义多并行度的source需要实现ParallelSourceFunction接口。
代码实现
MyParallelSource.java
package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;public class MyParallelSource implements ParallelSourceFunctionString {boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {Random random new Random();while (running) {ctx.collect(Num: random.nextInt(100));Thread.sleep(1000);}}Overridepublic void cancel() {running false;}
}SourceDemo.java
package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString source env.addSource(new MyParallelSource());source.print();env.execute(source_demo);}
}
运行结果
7 Num: 43
8 Num: 30
1 Num: 92
2 Num: 50
5 Num: 39
6 Num: 6
4 Num: 20
3 Num: 2四. 自定义单并行度增强型Source
增强型Source额外提供了open和close方法可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。
在mysql中创建student表并插入三条数据。
create table student (id int primary key,name varchar(50),age int
);insert into student values(1, name1, 20),(2, name2, 30), (3, name3, 15);实现代码
Student.java
package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id id;this.name name;this.age age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getAge() {return age;}public void setAge(int age) {this.age age;}Overridepublic String toString() {return Student{ id id , name name \ , age age };}
}
MysqlSource.java
package flink.basic.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;public class MysqlSource extends RichSourceFunctionStudent {Connection conn;Statement stmt;Overridepublic void open(Configuration parameters) throws Exception {Class.forName(com.mysql.cj.jdbc.Driver);String url jdbc:mysql://192.168.47.130:3306/test;String user root;String password root;conn DriverManager.getConnection(url,user,password);stmt conn.createStatement();}Overridepublic void run(SourceContextStudent ctx) throws Exception {ResultSet rs stmt.executeQuery(select * from student);while (rs.next()) {int id rs.getInt(id);String name rs.getString(name);int age rs.getInt(age);ctx.collect(new Student(id, name, age));}rs.close();}Overridepublic void cancel() {}Overridepublic void close() throws Exception {stmt.close();conn.close();}
}
SourceDemo.java
package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSourceStudent source env.addSource(new MysqlSource());source.print();env.execute(source_demo);}
}运行结果
1 Student{id3, namename3, age15}
8 Student{id2, namename2, age30}
7 Student{id1, namename1, age20}