网站为什么要seo?,没有备案的网站百度能收录吗,深圳网站建设哪个最好,创世网站建设#x1f680; 作者 #xff1a;“大数据小禅” #x1f680; 文章简介 #xff1a;【Flink实战】玩转Flink里面核心的Source Operator实战 #x1f680; 欢迎小伙伴们 点赞#x1f44d;、收藏⭐、留言#x1f4ac; 目录导航 Flink 的API层级介绍Source Operator速览Flin… 作者 “大数据小禅” 文章简介 【Flink实战】玩转Flink里面核心的Source Operator实战 欢迎小伙伴们 点赞、收藏⭐、留言 目录导航 Flink 的API层级介绍Source Operator速览Flink 预定义的Source 数据源 案例实战Flink自定义的Source 数据源案例-订单来源实战 Flink 的API层级介绍Source Operator速览 Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象 第一层是最底层的抽象为有状态实时流处理抽象实现是 Process Function用于底层处理 第二层抽象是 Core APIs许多应用程序不需要使用到上述最底层抽象的 API而是使用 Core APIs 进行开发 例如各种形式的用户自定义转换transformations、联接joins、聚合aggregations、窗口windows和状态state操作等此层 API 中处理的数据类型在每种编程语言中都有其对应的类。 第三层抽象是 Table API。 是以表Table为中心的声明式编程APITable API 使用起来很简洁但是表达能力差 类似数据库中关系模型中的操作比如 select、project、join、group-by 和 aggregate 等允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API但是其程序实现都是 SQL 查询表达式 SQL 抽象与 Table API 抽象之间的关联是非常紧密的 注意Table和SQL层变动多还在持续发展中大致知道即可核心是第一和第二层 Flink编程模型 Source来源 元素集合 env.fromElementsenv.fromColletionenv.fromSequence(start,end); 文件/文件系统 env.readTextFile(本地文件);env.readTextFile(HDFS文件); 基于Socket env.socketTextStream(“ip”, 8888) 自定义Source实现接口自定义数据源rich相关的api更丰富 并行度为1 SourceFunctionRichSourceFunction 并行度大于1 ParallelSourceFunctionRichParallelSourceFunction Connectors与第三方系统进行对接用于source或者sink都可以 Flink本身提供Connector例如kafka、RabbitMQ、ES等注意Flink程序打包一定要将相应的connetor相关类打包进去不然就会失败 Apache Bahir连接器 里面也有kafka、RabbitMQ、ES的连接器更多 总结 和外部系统进行读取写入的 第一种 Flink 里面预定义的 source 和 sink。第二种 Flink 内部也提供部分 Boundled connectors。第三种是第三方 Apache Bahir 项目中的连接器。第四种是通过异步 IO 方式 异步I/O是Flink提供的非常底层的与外部系统交互
Flink 预定义的Source 数据源 案例实战
Source来源 元素集合 env.fromElementsenv.fromColletionenv.fromSequence(start,end); public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//相同类型元素的数据流 sourceDataStreamString stringDS1 env.fromElements(java,SpringBoot, spring cloud,redis, kafka,小滴课堂);stringDS1.print(stringDS1);DataStreamString stringDS2 env.fromCollection(Arrays.asList(微服务项目大课,java,alibabacloud,rabbitmq,hadoop,hbase));stringDS2.print(stringDS2);DataStreamSourceLong longDS3 env.fromSequence(0,10);longDS3.print(longDS3);//DataStream需要调用execute,可以取个名称env.execute(xdclass job);}
文件/文件系统 env.readTextFile(本地文件);env.readTextFile(HDFS文件);
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamString textDS env.readTextFile(/Users/xdclass/Desktop/xdclass_access.log);//DataStreamString textDS env.readTextFile(hdfs://xdclass_node:8010/file/log/words.txt);textDS.print();env.execute(xdclass job);
}基于Socket env.socketTextStream(“ip”, 8888) public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamString stringDataStream env.socketTextStream(127.0.0.1,8888);stringDataStream.print();env.execute( job);
}Flink自定义的Source 数据源案例-订单来源实战 自定义Source实现接口自定义数据源 并行度为1 SourceFunctionRichSourceFunction 并行度大于1 ParallelSourceFunctionRichParallelSourceFunction Rich相关的api更丰富多了Open、Close方法用于初始化连接等 创建接口
Data
AllArgsConstructor
NoArgsConstructor
public class VideoOrder {private String tradeNo;private String title;private int money;private int userId;private Date createTime;}public class VideoOrderSource extends RichParallelSourceFunctionVideoOrder {private volatile Boolean flag true;private Random random new Random();private static ListString list new ArrayList();static {list.add(spring boot2.x课程);list.add(微服务SpringCloud课程);list.add(RabbitMQ消息队列);list.add(Kafka课程);list.add(Flink流式技术课程);list.add(工业级微服务项目大课训练营);list.add(Linux课程);}Overridepublic void run(SourceContextVideoOrder ctx) throws Exception {while (flag){Thread.sleep(1000);String id UUID.randomUUID().toString();int userId random.nextInt(10);int money random.nextInt(100);int videoNum random.nextInt(list.size());String title list.get(videoNum);ctx.collect(new VideoOrder(id,title,money,userId,new Date()));}}/*** 取消任务*/Overridepublic void cancel() {flag false;}
}案例
public static void main(String [] args) throws Exception {//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamVideoOrder videoOrderDataStream env.addSource(new VideoOrderSource());videoOrderDataStream.print();//DataStream需要调用execute,可以取个名称env.execute(custom source job);}
不断产生很多订单