当前位置: 首页 > news >正文

潍坊网站制作公司冷水滩做微信网站

潍坊网站制作公司,冷水滩做微信网站,网站开发商城app,腾讯公司做的购物网站学习笔记 Flink作为数据处理框架#xff0c;最终还是要把计算处理的结果写入外部存储#xff0c;为外部应用提供支持。 文章目录 **连接到外部系统****输出到文件**输出到 Kafka输出到 mysql自定义 sink 连接到外部系统 Flink的DataStream API专门提供了向外部写入数据的方… 学习笔记 Flink作为数据处理框架最终还是要把计算处理的结果写入外部存储为外部应用提供支持。 文章目录 **连接到外部系统****输出到文件**输出到 Kafka输出到 mysql自定义 sink 连接到外部系统 Flink的DataStream API专门提供了向外部写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。 Flink1.12以前Sink算子的创建是通过调用DataStream的.addSink()方法实现的。 stream.addSink(new SinkFunction(…)); addSink方法同样需要传入一个参数实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke()用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。 Flink1.12开始同样重构了Sink架构 stream.sinkTo(…) 当然Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示列出了Flink官方目前支持的第三方系统连接器 https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/ 我们可以看到像Kafka之类流式系统Flink提供了完美对接source/sink两端都能连接可读可写而对于Elasticsearch、JDBC等数据存储系统则只提供了输出写入的sink连接器。 除Flink官方之外Apache Bahir框架也实现了一些其他第三方系统与Flink的连接器。 除此以外就需要用户自定义实现sink连接器了。 输出到文件 Flink专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统。 FileSink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用FileSink的静态方法 行编码 FileSink.forRowFormatbasePathrowEncoder。批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。 public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator);// 输出到文件系统FileSinkString fieSink FileSink// 输出行式存储的文件指定路径、指定编码.StringforRowFormat(new Path(f:/tmp), new SimpleStringEncoder(UTF-8))// 输出文件的一些配置 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(atguigu-).withPartSuffix(.log).build())// 按照目录分桶如下就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 文件滚动策略: 1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();} } 输出到 Kafka 1添加Kafka 连接器依赖 由于我们已经测试过从Kafka数据源读取数据连接器相关依赖已经引入这里就不重复介绍了。 2启动Kafka集群 3编写输出到Kafka的示例代码 public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次必须开启checkpoint后续章节介绍env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** Kafka Sink:* TODO 注意如果要使用 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint后续介绍* 2、设置事务前缀* 3、设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();} }自定义序列化器实现带key的record: public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** 如果要指定写入kafka的key可以自定义序列化器* 1、实现 一个接口重写 序列化 方法* 2、指定key转成 字节数组* 3、指定value转成 字节数组* 4、返回一个 ProducerRecord对象把key、value放进去*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setRecordSerializer(new KafkaRecordSerializationSchemaString() {NullableOverridepublic ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(ws, key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(atguigu-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();} }输出到 mysql 写入数据的MySQL的测试步骤如下。 1添加依赖 添加MySQL驱动 dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.27/version /dependency官方还未提供flink-connector-jdbc的1.17.0的正式依赖暂时从apache snapshot仓库下载pom文件中指定仓库路径 repositoriesrepositoryidapache-snapshots/idnameapache snapshots/name urlhttps://repository.apache.org/content/repositories/snapshots//url/repository /repositories添加依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version /dependency如果不生效还需要修改本地maven的配置文件mirrorOf中添加如下标红内容 mirroridaliyunmaven/idmirrorOf*,!apache-snapshots/mirrorOfname阿里云公共仓库/nameurlhttps://maven.aliyun.com/repository/public/url /mirror2启动MySQL在test库下建表ws mysql CREATE TABLE ws ( id varchar(100) NOT NULL, ts bigint(20) DEFAULT NULL, vc int(11) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf83编写输出到MySQL的示例代码 public class SinkMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperatorWaterSensor sensorDS env .socketTextStream(hadoop102, 7777) .map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法 addsink* 2、JDBCSink的4个参数:* 第一个参数 执行的sql一般就是 insert into* 第二个参数 预编译sql 对占位符填充值* 第三个参数 执行选项 ---》 攒批、重试* 第四个参数 连接选项 ---》 url、用户名、密码*/ SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://hadoop102:3306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(000000).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build() );sensorDS.addSink(jdbcSink);env.execute(); } }4运行代码用客户端连接MySQL查看是否成功写入数据。 自定义 sink 如果我们想将数据存储到我们自己的存储设备中而Flink并没有提供可以直接使用的连接器就只能自定义Sink进行输出了。与Source类似Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类只要实现它通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。 stream.addSink(new MySinkFunction()); 在实现SinkFunction的时候需要重写的一个关键方法invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。 这种方式比较通用对于任何外部存储系统都有效不过自定义Sink想要实现状态一致性并不容易所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现而且在不断地扩充因此自定义的场景并不常见。
http://www.dnsts.com.cn/news/143827.html

相关文章:

  • 苏州网站建设布局网站文章伪原创怎么做
  • 建站公司 商城株洲搜索引擎优化
  • 怎样自己做网络推广网站绍兴百度seo公司
  • 廊坊哪里有做阿里巴巴网站的福田欧曼图片
  • 如何查看网站名称安徽池州做企业网站
  • 东莞网站建设 乐云践新网页制作制作公司
  • 上海网站建站建设html入门
  • 嘉鱼网站建设多少钱网站开发方式有
  • 自己弄个网站个人做收费网站
  • 做销售的去哪个网站应聘做枪版视频网站犯法吗
  • 河北建设网站公司企业营销策划是什么
  • 深圳国外网站建设如何申请网站域名流程
  • 做汽车商城网站wordpress文件上传管理系统
  • 广州市建设职业培训学校网站义乌外贸公司网站
  • 企业网站公众号英文网站如何推广
  • c2c网站代表有哪些运城注册公司
  • 网站优化 工具微站小程序
  • 网站制作导航栏怎么做wordpress后台不能登陆
  • 中商外贸网站网站建设大题
  • 境外建网站企业文化管理咨询
  • 杭州网站改版公司电话湖北省建设厅网站首页
  • 有个网站是做视频相册的app商城系统开发
  • 辽宁网站建设的网络科技公司门户网站建设会议纪要
  • 鄂尔多斯网站推广免费的域名解析
  • 沙田镇仿做网站wordpress近期文章
  • 湛江模板建站服务商微商推广
  • 58同城企业网站怎么做的同城的网站建设
  • 昆山市网站建设金点子招聘信息
  • 南京网站关键词优化咨询wordpress重新安装数据库
  • 益阳建设企业网站网站的建设期