中文博客网站模板,wordpress如何改成cms,国内新冠最新消息,深圳的网站建设的公司Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架#xff0c;可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时#xff0c;Flink 拥有丰富的连接器与各类工具#xff0c;可对接…Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时Flink 拥有丰富的连接器与各类工具可对接众多不同类型的数据源实现数据的读取与写入。在数据处理的过程中Flink 还提供了一系列可靠的容错机制有力保障任务即便遭遇意外状况依然能稳定、持续运行。
借助 TDengine 的 Flink 连接器Apache Flink 得以与 TDengine 数据库无缝对接一方面能够将经过复杂运算和深度分析后所得到的结果精准存入 TDengine 数据库实现数据的高效存储与管理另一方面也可以从 TDengine 数据库中快速、稳定地读取海量数据并在此基础上进行全面、深入的分析处理充分挖掘数据的潜在价值为企业的决策制定提供有力的数据支持和科学依据极大地提升数据处理的效率和质量增强企业在数字化时代的竞争力和创新能力。
前置条件
准备以下环境
TDengine 服务已部署并正常运行企业及社区版均可taosAdapter 能够正常运行。详细参考 taosAdapter 使用手册Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 官方文档
支持的平台
Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。
版本历史
Flink Connector 版本主要变化TDengine 版本2.0.1Sink 支持对所有继承自 RowData 并已实现的类型进行数据写入-2.0.01. 支持 SQL 查询 TDengine 数据库中的数据 2. 支持 CDC 订阅 TDengine 数据库中的数据 3. 支持 Table SQL 方式读取和写入 TDengine 数据库3.3.5.1 及以上版本1.0.0支持 Sink 功能将来着其他数据源的数据写入到 TDengine3.3.2.0 及以上版本
异常和错误码
在任务执行失败后查看 Flink 任务执行日志确认失败原因
具体的错误码请参考
Error CodeDescriptionSuggested Actions0xa000connection param error连接器参数错误。0xa001the groupid parameter of CDC is incorrectCDC 的 groupid 参数错误。0xa002wrong topic parameter for CDCCDC 的 topic 参数错误。0xa010database name configuration error数据库名配置错误。0xa011table name configuration error表名配置错误。0xa012no data was obtained from the data source从数据源中获取数据失败。0xa013value.deserializer parameter not set未设置序列化方式。0xa014list of column names for target table not set未设置目标表的列名列表。0x2301connection already closed连接已经关闭检查连接情况或重新创建连接去执行相关指令。0x2302this operation is NOT supported currently!当前使用接口不支持可以更换其他连接方式。0x2303invalid variables参数不合法请检查相应接口规范调整参数类型及大小。0x2304statement is closedstatement 已经关闭请检查 statement 是否关闭后再次使用或是连接是否正常。0x2305resultSet is closedresultSet 结果集已经释放请检查 resultSet 是否释放后再次使用。0x230dparameter index out of range参数越界请检查参数的合理范围。0x230econnection already closed连接已经关闭请检查 Connection 是否关闭后再次使用或是连接是否正常。0x230funknown sql type in TDengine请检查 TDengine 支持的 Data Type 类型。0x2315unknown taos type in TDengine在 TDengine 数据类型与 JDBC 数据类型转换时是否指定了正确的 TDengine 数据类型。0x2319user is required创建连接时缺少用户名信息。0x231apassword is required创建连接时缺少密码信息。0x231dcan’t create connection with server within通过增加参数 httpConnectTimeout 增加连接耗时或是请检查与 taosAdapter 之间的连接情况。0x231efailed to complete the task within the specified time通过增加参数 messageWaitTimeout 增加执行耗时或是请检查与 taosAdapter 之间的连接情况。0x2352Unsupported encoding本地连接下指定了不支持的字符编码集。0x2353internal error of database, please see taoslog for more details本地连接执行 prepareStatement 时出现错误请检查 taos log 进行问题定位。0x2354connection is NULL本地连接执行命令时Connection 已经关闭。请检查与 TDengine 的连接情况。0x2355result set is NULL本地连接获取结果集结果集异常请检查连接情况并重试。0x2356invalid num of fields本地连接获取结果集的 meta 信息不匹配。0x2357empty sql string填写正确的 SQL 进行执行。0x2371consumer properties must not be null!创建订阅时参数为空请填写正确的参数。0x2375topic reference has been destroyed创建数据订阅过程中topic 引用被释放。请检查与 TDengine 的连接情况。0x2376failed to set consumer topic, topic name is empty创建数据订阅过程中订阅 topic 名称为空。请检查指定的 topic 名称是否填写正确。0x2377consumer reference has been destroyed订阅数据传输通道已经关闭请检查与 TDengine 的连接情况。0x2378consumer create error创建数据订阅失败请根据错误信息检查 taos log 进行问题定位。0x237avGroup not found in result set VGroup没有分配给当前 consumer由于 Rebalance 机制导致 Consumer 与 VGroup 不是绑定的关系。
数据类型映射
TDengine 目前支持时间戳、数字、字符、布尔类型与 Flink RowData Type 对应类型转换如下
TDengine DataTypeFlink RowDataTypeTIMESTAMPTimestampDataINTIntegerBIGINTLongFLOATFloatDOUBLEDoubleSMALLINTShortTINYINTByteBOOLBooleanBINARYbyte[]NCHARStringDataJSONStringDataVARBINARYbyte[]GEOMETRYbyte[]
使用说明
Flink 语义选择说明
采用 At-Least-Once至少一次语义原因
TDengine 目前不支持事务不能进行频繁的检查点操作和复杂的事务协调。由于 TDengine 采用时间戳作为主键重复数据下游算子可以进行过滤操作避免重复计算。采用 At-Least-Once至少一次确保达到较高的数据处理的性能和较低的数据延时设置方式如下
使用方式:
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);如果使用 Maven 管理项目只需在 pom.xml 中加入以下依赖。
dependencygroupIdcom.taosdata.flink/groupIdartifactIdflink-connector-tdengine/artifactIdversion2.0.1/version
/dependency连接参数
建立连接的参数有 URL 和 Properties。 URL 规范格式为 jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user{user}|password{password}|timezone{timezone}]
参数说明
user登录 TDengine 用户名默认值 ‘root’。password用户登录密码默认值 ‘taosdata’。database_name: 数据库名称。timezone: 时区设置。httpConnectTimeout: 连接超时时间单位 ms 默认值为 60000。messageWaitTimeout: 消息超时时间单位 ms 默认值为 60000。useSSL: 连接中是否使用 SSL。
Source
Source 拉取 TDengine 数据库中的数据并将获取到的数据转换为 Flink 内部可处理的格式和类型并以并行的方式进行读取和分发为后续的数据处理提供高效的输入。 通过设置数据源的并行度实现多个线程并行地从数据源中读取数据提高数据读取的效率和吞吐量充分利用集群资源进行大规模数据处理能力。
Properties 中配置参数如下
TDengineConfigParams.PROPERTY_KEY_USER登录 TDengine 用户名默认值 ‘root’。TDengineConfigParams.PROPERTY_KEY_PASSWORD用户登录密码默认值 ‘taosdata’。TDengineConfigParams.VALUE_DESERIALIZER下游算子接收结果集反序列化方法, 如果接收结果集类型是 Flink 的 RowData仅需要设置为 RowData即可。也可继承 TDengineRecordDeserialization 并实现 convert 和 getProducedType 方法根据 SQL 的 ResultSet 自定义反序列化方式。TDengineConfigParams.TD_BATCH_MODE此参数用于批量将数据推送给下游算子如果设置为 True创建 TDengineSource 对象时需要指定数据类型为 SourceRecords 类型的泛型形式。TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用false: 不启用。默认为 false。TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔单位毫秒默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用false: 不启用。默认为 false。
按时间分片
用户可以对查询的 SQL 按照时间拆分为多个子任务输入开始时间结束时间拆分间隔时间字段名称系统会按照设置的间隔时间左闭右开进行拆分并行获取数据。
{{#include docs/examples/flink/Main.java:time_interval}}按超级表 TAG 分片
用户可以按照超级表的 TAG 字段将查询的 SQL 拆分为多个查询条件系统会以一个查询条件对应一个子任务的方式对其进行拆分进而并行获取数据。
{{#include docs/examples/flink/Main.java:tag_split}}按表名分片
支持输入多个相同表结构的超级表或普通表进行分片系统会按照一个表一个任务的方式进行拆分进而并行获取数据。
{{#include docs/examples/flink/Main.java:table_split}}使用 Source 连接器
查询结果为 RowData 数据类型示例 RowData Source java {{#include docs/examples/flink/Main.java:source_test}}
批量查询结果示例 Batch Source java {{#include docs/examples/flink/Main.java:source_batch_test}}
查询结果为自定义数据类型示例 Custom Type Source java {{#include docs/examples/flink/Main.java:source_custom_type_test}}
ResultBean 自定义的一个内部类用于定义 Source 查询结果的数据类型。ResultSoureDeserialization 是自定义的一个内部类通过继承 TDengineRecordDeserialization 并实现 convert 和 getProducedType 方法。
CDC 数据订阅
Flink CDC 主要用于提供数据订阅功能能实时监控 TDengine 数据库的数据变化并将这些变更以数据流形式传输到 Flink 中进行处理同时确保数据的一致性和完整性。
Properties 中配置参数如下
TDengineCdcParams.BOOTSTRAP_SERVERSTDengine 服务端所在的ip:port如果使用 WebSocket 连接则为 taosAdapter 所在的ip:port。TDengineCdcParams.CONNECT_USER登录 TDengine 用户名默认值 ‘root’。TDengineCdcParams.CONNECT_PASS用户登录密码默认值 ‘taosdata’。TDengineCdcParams.POLL_INTERVAL_MS拉取数据间隔, 默认 500ms。TDengineCdcParams.VALUE_DESERIALIZER结果集反序列化方法如果接收结果集类型是 Flink 的 RowData仅需要设置为 RowData即可。可以继承 com.taosdata.jdbc.tmq.ReferenceDeserializer并指定结果集 bean实现反序列化。TDengineCdcParams.TMQ_BATCH_MODE此参数用于批量将数据推送给下游算子如果设置为 True创建 TDengineCdcSource 对象时需要指定数据类型为 ConsumerRecords 类型的泛型形式。TDengineCdcParams.GROUP_ID消费组 ID同一消费组共享消费进度。最大长度192。TDengineCdcParams.AUTO_OFFSET_RESET 消费组订阅的初始位置 earliest 从头开始订阅, latest 仅从最新数据开始订阅, 默认 latest。TDengineCdcParams.ENABLE_AUTO_COMMIT是否启用消费位点自动提交true: 自动提交false依赖 checkpoint 时间来提交 默认 false。 注意自动提交模式reader获取完成数据后自动提交不管下游算子是否正确的处理了数据存在数据丢失的风险主要用于为了追求高效的无状态算子场景或是数据一致性要求不高的场景。 TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS消费记录自动提交消费位点时间间隔单位为毫秒。默认值为 5000, 此参数在 ENABLE_AUTO_COMMIT 为 true 生效。TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT是否启用自动重连。true: 启用false: 不启用。默认为 false。TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS自动重连重试间隔单位毫秒默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT自动重连重试次数默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。TDengineCdcParams.TMQ_SESSION_TIMEOUT_MSconsumer 心跳丢失后超时时间超时后会触发 rebalance 逻辑成功后该 consumer 会被删除从3.3.3.0版本开始支持 默认值为 12000取值范围 [6000 1800000]。TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MSconsumer poll 拉取数据间隔的最长时间超过该时间会认为该 consumer 离线触发 rebalance 逻辑成功后该 consumer 会被删除。 默认值为 300000[1000INT32_MAX]。
使用 CDC 连接器
CDC 连接器会根据用户设置的并行度进行创建 consumer因此用户根据资源情况合理设置并行度。
订阅结果为 RowData 数据类型示例 CDC Source java {{#include docs/examples/flink/Main.java:cdc_source}}
将订阅结果批量下发到算子的示例 CDC Batch Source java {{#include docs/examples/flink/Main.java:cdc_batch_source}}
订阅结果为自定义数据类型示例 CDC Custom Type java {{#include docs/examples/flink/Main.java:cdc_custom_type_test}}
ResultBean 是自定义的一个内部类其字段名和数据类型与列的名称和数据类型一一对应这样根据 TDengineCdcParams.VALUE_DESERIALIZER 属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。
Sink
Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自不同数据源或算子的数据写入 TDengine。在这一过程中TDengine 所具备的高效写入机制发挥了至关重要的作用有力保障了数据的快速和稳定存储。
Properties 中配置参数如下
TDengineConfigParams.PROPERTY_KEY_USER登录 TDengine 用户名默认值 ‘root’。TDengineConfigParams.PROPERTY_KEY_PASSWORD用户登录密码默认值 ‘taosdata’。TDengineConfigParams.PROPERTY_KEY_DBNAME写入的数据库名称。TDengineConfigParams.TD_SUPERTABLE_NAME写入的超级表名称。接收的数据必须有 tbname 字段确定写入那张子表。TDengineConfigParams.TD_TABLE_NAME写入子表或普通表的表名此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可。TDengineConfigParams.VALUE_DESERIALIZER接收结果集反序列化方法, 如果接收结果集类型是 Flink 的 RowData仅需要设置为 RowData即可。也可继承 TDengineSinkRecordSerializer 并实现 serialize 方法根据 接收的数据类型自定义反序列化方式。TDengineConfigParams.TD_BATCH_SIZE设置一次写入 TDengine 数据库的批大小 | 当到达批的数量后进行写入或是一个checkpoint的时间也会触发写入数据库。TDengineConfigParams.TD_BATCH_MODE接收批量数据当设置为 True 时如果数据来源是 TDengine Source则使用 SourceRecords 泛型类型来创建 TDengineSink 对象若来源是 TDengine CDC则使用 ConsumerRecords 泛型来创建 TDengineSink 对象。TDengineConfigParams.TD_SOURCE_TYPE设置数据来源。 当数据来源是 TDengine Source 是设置为 ‘tdengine_source’, 当来源是 TDengine CDC 设置为 ‘tdengine_cdc’。当配置 TD_BATCH_MODE 为 True 生效。TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用false: 不启用。默认为 false。TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔单位毫秒默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用false: 不启用。默认为 false。
使用示例
将 power 库的 meters 表的子表数据写入 power_sink 库的 sink_meters 超级表对应的子表中。 Sink RowData java {{#include docs/examples/flink/Main.java:RowDataToSink}}
使用示例
订阅 power 库的 meters 超级表的子表数据写入 power_sink 库的 sink_meters 超级表对应的子表中。 Cdc Sink java {{#include docs/examples/flink/Main.java:CdcRowDataToSink}}
Table SQL
使用 Table SQL 的方式从多个不同的数据源数据库如 TDengine、MySQL、Oracle 等中提取数据后 再进行自定义的算子操作如数据清洗、格式转换、关联不同表的数据等然后将处理后的结果加载到目标数据源如 TDengine、Mysql 等中。
Table Source 连接器
参数配置说明
参数名称类型参数说明connectorstring连接器标识设置 tdengine-connector 。td.jdbc.urlstring连接的 url 。td.jdbc.modestrng连接器类型, 设置 source, sink。table.namestring原表或目标表名称。scan.querystring获取数据的 SQL 语句。sink.db.namestring目标数据库名称。sink.supertable.namestring写入的超级表名称。sink.batch.sizeinteger写入的批大小。sink.table.namestring写入的普通表或子表名称。
使用示例
将 power 库的 meters 表的子表数据写入 power_sink 库的 sink_meters 超级表对应的子表中。 Table Source java {{#include docs/examples/flink/Main.java:source_table}}
Table CDC 连接器
参数配置说明
参数名称类型参数说明connectorstring连接器标识设置 tdengine-connector。userstring用户名 默认 root。passwordstring密码 默认taosdata。bootstrap.serversstring服务器地址。topicstring订阅主题。td.jdbc.modestrng连接器类型, cdc, sink。group.idstring消费组 ID同一消费组共享消费进度。auto.offset.resetstring消费组订阅的初始位置。earliest: 从头开始订阅 latest: 仅从最新数据开始订阅。 默认 latest。poll.interval_msinteger拉取数据间隔, 默认 500ms。sink.db.namestring目标数据库名称。sink.supertable.namestring写入的超级表名称。sink.batch.sizeinteger写入的批大小。sink.table.namestring写入的普通表或子表名称。
使用示例
订阅 power 库的 meters 超级表的子表数据写入 power_sink 库的 sink_meters 超级表对应的子表中。 Table CDC java {{#include docs/examples/flink/Main.java:cdc_table}}