个人网站平台,sem网站推广怎么做,wordpress 能装windows,删除wordpress主体Debezium日常分享系列之#xff1a;Debezium Engine 依赖打包项目在代码中输出消息格式消息转换消息转换谓词高级记录使用引擎属性异步引擎属性数据库模式历史属性处理故障 Debezium连接器通常通过部署到Kafka Connect服务来运行#xff0c;并配置一个或多个连接器来监视上游… Debezium日常分享系列之Debezium Engine 依赖打包项目在代码中输出消息格式消息转换消息转换谓词高级记录使用引擎属性异步引擎属性数据库模式历史属性处理故障 Debezium连接器通常通过部署到Kafka Connect服务来运行并配置一个或多个连接器来监视上游数据库并为上游数据库中的所有更改生成数据变更事件。这些数据变更事件被写入Kafka可以由许多不同的应用程序独立消费。Kafka Connect提供了出色的容错性和可伸缩性因为它作为分布式服务运行并确保所有注册和配置的连接器始终运行。例如即使集群中的一个Kafka Connect端点关闭剩余的Kafka Connect端点也会重新启动之前在已终止端点上运行的任何连接器从而最大程度地减少停机时间并消除管理活动。
并非每个应用程序都需要这种级别的容错性和可靠性他们可能不想依赖外部的Kafka代理和Kafka Connect服务。相反一些应用程序更愿意直接在应用程序空间中嵌入Debezium连接器。它们仍然需要相同的数据变更事件但更希望连接器直接将其发送到应用程序而不是在Kafka中持久化。 这个debezium-api模块定义了一个小的API允许应用程序使用Debezium Engine轻松配置和运行Debezium连接器。
从2.6.0版本开始Debezium提供了两个DebeziumEngine接口的实现。较旧的EmbeddedEngine实现运行一个只使用一个任务的连接器。连接器按顺序发出所有记录。这是默认的实现。
从2.6.0版本开始还提供了一个新的AsyncEmbeddedEngine实现。这个实现也只运行一个连接器但它可以在多个线程中处理记录并运行多个任务如果连接器支持的话目前只有SQL Server和MongoDB的连接器支持在一个连接器中运行多个任务。由于这两个引擎实现了相同的接口并共享相同的API下面的代码示例对于任何引擎都是有效的。这两个实现支持相同的配置选项。
然而新的AsyncEmbeddedEngine提供了一些用于设置和优化并行处理的新配置选项。
依赖
要使用Debezium Engine模块将debezium-api模块添加到应用程序的依赖项中。还应将debezium-embedded模块添加到依赖项中这是该API的一个开箱即用的实现。对于Maven这需要将以下内容添加到应用程序的POM文件中
dependencygroupIdio.debezium/groupIdartifactIddebezium-api/artifactIdversion${version.debezium}/version
/dependency
dependencygroupIdio.debezium/groupIdartifactIddebezium-embedded/artifactIdversion${version.debezium}/version
/dependency其中${version.debezium}可以是您使用的Debezium版本也可以是一个包含Debezium版本字符串的Maven属性的值。
同样为您的应用程序将使用的每个Debezium连接器添加依赖项。例如可以将以下内容添加到您的应用程序的Maven POM文件中以便您的应用程序可以使用MySQL连接器
dependencygroupIdio.debezium/groupIdartifactIddebezium-connector-mysql/artifactIdversion${version.debezium}/version
/dependency或者对于 MongoDB 连接器
dependencygroupIdio.debezium/groupIdartifactIddebezium-connector-mongodb/artifactIdversion${version.debezium}/version
/dependency本文档的其余部分介绍了如何在应用程序中嵌入 MySQL 连接器。其他连接器的使用方式类似但连接器特定的配置、主题和事件除外。
打包项目
Debezium使用SPI通过ServiceLoader加载实现。实现可以基于连接器类型也可以是自定义实现。
有些接口有多个实现。例如io.debezium.snapshot.spi.SnapshotLock在核心中有一个默认实现并且针对每个连接器有特定的实现。为了确保Debezium可以定位所需的实现必须显式地配置构建工具以合并META-INF/services文件。
例如如果使用的是Maven shade插件请添加ServicesResourceTransformer转换器如下例所示
...
configurationtransformers...transformer implementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer /.../transformers
...
/configuration或者如果您使用 Maven Assembly 插件则可以使用 metaInf-services 容器描述符处理程序。
在代码中
您的应用程序需要为每个要运行的连接器实例设置一个嵌入式引擎。io.debezium.engine.DebeziumEngineR类作为一个易于使用的包装器完全管理连接器的生命周期。您可以使用它的构建器API创建DebeziumEngine实例提供以下内容
您希望以哪种格式接收消息例如JSON、Avro或Kafka Connect SourceRecord见链接配置属性可能从属性文件中加载用于定义引擎和连接器的环境一个方法该方法将被调用以处理连接器产生的每个数据变更事件
以下是一个配置和运行嵌入式引擎的示例代码
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props new Properties();
props.setProperty(name, engine);
props.setProperty(connector.class, io.debezium.connector.mysql.MySqlConnector);
props.setProperty(offset.storage, org.apache.kafka.connect.storage.FileOffsetBackingStore);
props.setProperty(offset.storage.file.filename, /path/to/storage/offsets.dat);
props.setProperty(offset.flush.interval.ms, 60000);
/* begin connector properties */
props.setProperty(database.hostname, localhost);
props.setProperty(database.port, 3306);
props.setProperty(database.user, mysqluser);
props.setProperty(database.password, mysqlpw);
props.setProperty(database.server.id, 85744);
props.setProperty(topic.prefix, my-app-connector);
props.setProperty(schema.history.internal, io.debezium.storage.file.history.FileSchemaHistory);
props.setProperty(schema.history.internal.file.filename, /path/to/storage/schemahistory.dat);// Create the engine with this configuration ...
try (DebeziumEngineChangeEventString, String engine DebeziumEngine.create(Json.class).using(props).notifying(record - {System.out.println(record);}).build()) {// Run the engine asynchronously ...ExecutorService executor Executors.newSingleThreadExecutor();executor.execute(engine);// Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished让我们更详细地研究这段代码从我们在这里重复的前几行开始
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props new Properties();
props.setProperty(name, engine);
props.setProperty(connector.class, io.debezium.connector.mysql.MySqlConnector);
props.setProperty(offset.storage, org.apache.kafka.connect.storage.FileOffsetBackingStore);
props.setProperty(offset.storage.file.filename, /path/to/storage/offsets.dat);
props.setProperty(offset.flush.interval.ms, 60000);这将创建一个新的标准Properties对象用于设置引擎所需的几个字段无论使用哪个连接器。第一个字段是引擎的名称它将在连接器产生的源记录和内部状态中使用因此在应用程序中使用一些有意义的名称。 connector.class字段定义了扩展Kafka Connect org.apache.kafka.connect.source.SourceConnector抽象类的类名在此示例中我们指定了Debezium的MySqlConnector类。
当Kafka Connect连接器运行时它会从源中读取信息并定期记录定义了它已经处理了多少信息的偏移量。如果连接器重新启动它将使用最后记录的偏移量来确定在源信息中应该从哪里恢复读取。由于连接器不知道也不关心偏移量的存储方式因此引擎需要提供一种存储和恢复这些偏移量的方式。我们的配置的下几个字段指定了我们的引擎应该使用FileOffsetBackingStore类将偏移量存储在本地文件系统上的/path/to/storage/offset.dat文件中文件可以任意命名和存储在任何位置。此外尽管连接器在生成每个源记录时记录偏移量但引擎会定期将偏移量刷新到后备存储在我们的示例中每分钟刷新一次。这些字段可以根据您的应用程序需要进行调整。
接下来的几行定义了特定于连接器的字段在每个连接器文档中有记录在我们的示例中是MySqlConnector连接器 /* begin connector properties */props.setProperty(database.hostname, localhost)props.setProperty(database.port, 3306)props.setProperty(database.user, mysqluser)props.setProperty(database.password, mysqlpw)props.setProperty(database.server.id, 85744)props.setProperty(topic.prefix, my-app-connector)props.setProperty(schema.history.internal, io.debezium.storage.file.history.FileSchemaHistory)props.setProperty(schema.history.internal.file.filename, /path/to/storage/schemahistory.dat)在这里我们设置了MySQL数据库服务器运行的主机机器的名称和端口号并定义了将用于连接到MySQL数据库的用户名和密码。请注意对于MySQL用户名和密码应对应于已被授予以下MySQL权限的MySQL数据库用户
SELECTRELOADSHOW DATABASESREPLICATION SLAVEREPLICATION CLIENT
在读取数据库的一致快照时需要前三个权限。最后两个权限允许数据库读取通常用于MySQL复制的服务器的binlog。
该配置还包括一个用于MySQL的数值标识符。由于MySQL的binlog是MySQL复制机制的一部分因此为了读取binlogMySqlConnector实例必须加入MySQL服务器组这意味着该服务器ID必须是1到232-1之间的任意整数。在我们的代码中我们将其设置为一个相当大但有些随机的值仅供我们的应用程序使用。
该配置还指定了MySQL服务器的逻辑名称。连接器将此逻辑名称包含在其生成的每个源记录的主题字段中使您的应用程序能够区分这些记录的来源。我们的示例使用了一个名为products的服务器名称这可能是因为数据库包含产品信息。当然您可以为您的应用程序命名任何有意义的名称。
当MySqlConnector类运行时它会读取MySQL服务器的binlog其中包括对由服务器托管的数据库所做的所有数据更改和模式更改。由于所有数据更改都是基于拥有表格的模式结构化的因此连接器需要跟踪所有模式更改以便可以正确解码更改事件。连接器记录模式信息以便如果连接器重新启动并恢复从最后记录的偏移量读取它知道该偏移量时数据库模式的确切外观。连接器如何记录数据库模式历史记录在我们的配置的最后两个字段中定义即我们的连接器应该使用FileSchemaHistory类将数据库模式历史更改存储在本地文件系统上的/path/to/storage/schemahistory.dat文件中同样此文件可以任意命名和存储在任何位置。
最后使用build()方法构建不可变配置。顺便说一下我们可以使用Configuration.read(…)方法之一从属性文件中读取配置而不是通过编程方式构建它。
现在我们有了一个配置我们可以创建引擎。以下是相关的代码行
// Create the engine with this configuration ...
try (DebeziumEngineChangeEventString, String engine DebeziumEngine.create(Json.class).using(props).notifying(record - {System.out.println(record);}).build()) {
}所有的更改事件都将传递给给定的处理方法该方法必须与java.util.function.ConsumerR函数接口的签名匹配其中R必须与调用create()时指定的格式类型匹配。请注意您的应用程序的处理函数不应抛出任何异常如果抛出异常引擎将记录方法抛出的任何异常并继续处理下一个源记录但您的应用程序将没有机会处理导致异常的特定源记录这意味着您的应用程序可能与数据库不一致。
此时我们有一个已配置并准备运行的DebeziumEngine对象但它什么也不做。DebeziumEngine设计为由Executor或ExecutorService异步执行
// Run the engine asynchronously ...
ExecutorService executor Executors.newSingleThreadExecutor();
executor.execute(engine);// Do something else or wait for a signal or an event您的应用程序可以通过调用其 close() 方法来安全、优雅地停止引擎
// At some later time ...
engine.close();或者由于引擎支持Closeable接口当离开try块时它将被自动调用。 引擎的连接器将停止从源系统读取信息将所有剩余的更改事件转发给处理函数并将最新的偏移量刷新到偏移量存储中。只有在所有这些操作完成后引擎的run()方法才会返回。如果您的应用程序需要在退出之前等待引擎完全停止您可以使用ExecutorService的shutdown和awaitTermination方法来实现
try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {logger.info(Waiting another 5 seconds for the embedded engine to shut down);}
}
catch ( InterruptedException e ) {Thread.currentThread().interrupt();
}或者您可以在创建DebeziumEngine时注册CompletionCallback作为回调函数以便在引擎终止时得到通知。
请记住当JVM关闭时它只会等待非守护线程。因此当您在守护线程上运行引擎时如果您的应用程序退出请确保等待引擎进程完成。
为了确保优雅和完全的关闭并确保每个源记录仅发送一次到应用程序您的应用程序应始终正确停止引擎。例如不要依赖于关闭ExecutorService因为这会中断运行的线程。虽然当线程被中断时DebeziumEngine确实会终止但引擎可能无法完全终止并且当您的应用程序重新启动时它可能会看到在关闭之前处理的一些相同的源记录。
正如前面提到的DebeziumEngine接口有两个实现。这两个实现使用相同的API前面的代码示例对两个版本都有效。唯一的例外是创建DebeziumEngine实例。正如在介绍中提到的默认情况下使用EmbeddedEngine实现。因此DebeziumEngine.create(Json.class)方法在内部使用EmbeddedEngine实例。
如果您想使用新的AsyncEmbeddedEngine实例可以使用以下方法DebeziumEngine#create(KeyValueHeaderChangeEventFormatK, V, H format, String builderFactory)
例如要创建一个使用AsyncEmbeddedEngine并以JSON作为其键、值和标头格式的嵌入式引擎您可以使用以下代码
try (DebeziumEngineChangeEventString, String engine DebeziumEngine.create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory).using(props).notifying(record - {System.out.println(record);}).build()) {// Also run the engine asynchronously ...ExecutorService executor Executors.newSingleThreadExecutor();executor.execute(engine);// Do something else or wait for a signal or an event
}输出消息格式
DebeziumEngine#create()可以接受多个不同的参数这些参数会影响消息被消费者接收的格式。允许的值为
Connect.class - 输出值是包装Kafka Connect的SourceRecord的变更事件Json.class - 输出值是键和值对编码为JSON字符串JsonByteArray.class - 输出值是键和值对格式化为JSON并编码为UTF-8字节数组Avro.class - 输出值是以Avro序列化记录编码的键和值对CloudEvents.class - 输出值是编码为 消息的键和值对
在调用DebeziumEngine#create()时也可以指定标头格式。允许的值为
Json.class - 标头值被编码为JSON字符串JsonByteArray.class - 标头值被格式化为JSON并编码为UTF-8字节数组
在内部引擎将数据转换委托给Kafka Connect或Apicurio转换器实现使用最适合执行转换的算法。可以使用引擎属性对转换器进行参数化以修改其行为。JSON输出格式的示例
final Properties props new Properties();
...
props.setProperty(converter.schemas.enable, false); // dont include schema in message
...
final DebeziumEngineChangeEventString, String engine DebeziumEngine.create(Json.class).using(props).notifying((records, committer) - {for (ChangeEventString, String r : records) {System.out.println(Key r.key() value r.value() );committer.markProcessed(r);}
...其中 ChangeEvent 数据类型是键/值对。
消息转换
在将消息传递给处理程序之前可以通过Kafka Connect的简单消息转换SMT管道运行它们。每个SMT可以将消息保持不变、修改消息或过滤消息。使用属性transforms配置链。属性包含要应用的转换的逗号分隔的逻辑名称列表。然后属性transforms.logical_name.type为每个转换定义了实现类的名称transforms.logical_name.*配置选项将传递给转换。 配置示例
final Properties props new Properties();
...
props.setProperty(transforms, filter, router); // (1)
props.setProperty(transforms.router.type, org.apache.kafka.connect.transforms.RegexRouter); // (2)
props.setProperty(transforms.router.regex, (.*)); // (3)
props.setProperty(transforms.router.replacement, trf$1); // (3)
props.setProperty(transforms.filter.type, io.debezium.embedded.ExampleFilterTransform); // (4)定义了两个转换 - 过滤器和路由器
路由器转换的实现是 org.apache.kafka.connect.transforms.RegexRouter
路由器转换有两个配置选项 - 正则表达式和替换
过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform
消息转换谓词
谓词可以应用于转换以使转换成为可选的。
配置示例如下
final Properties props new Properties();
...
props.setProperty(transforms, filter); // (1)
props.setProperty(predicates, headerExists); // (2)
props.setProperty(predicates.headerExists.type, org.apache.kafka.connect.transforms.predicates.HasHeaderKey); //(3)
props.setProperty(predicates.headerExists.name, header.name); // (4)
props.setProperty(transforms.filter.type, io.debezium.embedded.ExampleFilterTransform);// (5)
props.setProperty(transforms.filter.predicate, headerExists); // (6)
props.setProperty(transforms.filter.negate, true); 定义了一个转换 - 过滤器
定义了一个谓词 - headerExists
headerExists 谓词的实现是 org.apache.kafka.connect.transforms.predicates.HasHeaderKey
headerExists 谓词有一个配置选项 - name
过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform
过滤器转换需要谓词 headerExists
过滤器转换期望谓词的值被否定从而使谓词确定标头是否不存在
高级记录使用
对于某些用例例如尝试批量写入记录或针对异步 API 时上面描述的功能接口可能具有挑战性。在这些情况下使用 io.debezium.engine.DebeziumEngine.ChangeConsumer. 接口可能会更容易。
此接口具有单个函数其签名如下
/*** Handles a batch of records, calling the {link RecordCommitter#markProcessed(Object)}* for each record and {link RecordCommitter#markBatchFinished()} when this batch is finished.* param records the records to be processed* param committer the committer that indicates to the system that we are finished*/void handleBatch(ListR records, RecordCommitterR committer) throws InterruptedException;如Javadoc中所提到的RecordCommitter对象将在每个记录和每个批次完成时被调用。RecordCommitter接口是线程安全的这允许对记录进行灵活的处理。
您可以选择重写已处理的记录的偏移量。这可以通过首先调用RecordCommitter#buildOffsets()构建一个新的Offsets对象使用Offsets#set(String key, Object value)更新偏移量然后使用更新后的Offsets调用RecordCommitter#markProcessed(SourceRecord record, Offsets sourceOffsets)来完成。
要使用ChangeConsumer API您必须将接口的实现传递给通知API如下所示
class MyChangeConsumer implements DebeziumEngine.ChangeConsumerRecordChangeEventSourceRecord {public void handleBatch(ListRecordChangeEventSourceRecord records, RecordCommitterRecordChangeEventSourceRecord committer) throws InterruptedException {...}
}
// Create the engine with this configuration ...
DebeziumEngineRecordChangeEventSourceRecord engine DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(props).notifying(new MyChangeConsumer()).build();如果使用 JSON 格式等效格式也适用于其他格式则代码将如下所示
class JsonChangeConsumer implements DebeziumEngine.ChangeConsumerChangeEventString, String {public void handleBatch(ListChangeEventString, String records,RecordCommitterChangeEventString, String committer) throws InterruptedException {...}
}
// Create the engine with this configuration ...
DebeziumEngineChangeEventString, String engine DebeziumEngine.create(Json.class).using(props).notifying(new JsonChangeConsumer()).build();引擎属性
除非有默认值否则以下配置属性是必需的为了文本格式化Java 类的包名称被替换为 …。
属性默认值描述name连接器实例的唯一名称。connector.class连接器的 Java 类的名称offset.storage负责连接器偏移持久性的 Java 类的名称。offset.storage.file.filename存储偏移量的文件的路径。offset.storage.topic要存储偏移量的 Kafka 主题的名称。offset.storage.partitions创建偏移量存储主题时使用的分区数。offset.storage.replication.factor创建偏移存储主题时使用的复制因子。offset.commit.policy提交策略的 Java 类的名称。它根据处理的事件数和自上次提交以来经过的时间定义何时触发偏移提交。默认是基于时间间隔的定期提交策略。offset.flush.interval.ms60000尝试提交偏移的间隔。默认值为 1 分钟。offset.flush.timeout.ms5000在取消该过程并恢复将来尝试提交的偏移数据之前等待记录刷新和分区要提交到偏移存储的最大毫秒数。默认值为 5 秒。errors.max.retries-1失败前连接错误的最大重试次数-1 无限制0 禁用 0 重试次数。errors.retry.delay.initial.ms300遇到连接错误时重试的初始延迟以毫秒为单位。每次重试时此值将加倍但不会超过 errors.retry.delay.max.ms。errors.retry.delay.max.ms10000遇到连接错误时重试之间的最大延迟以毫秒为单位。
异步引擎属性
属性默认值描述record.processing.threads根据工作负载和可用 CPU 核心数按需分配线程。可用于处理更改事件记录的线程数。如果未指定任何值默认值则引擎将使用 Java ThreadPoolExecutor 根据当前工作负载动态调整线程数。最大线程数是给定计算机上的 CPU 核心数。如果指定了值则引擎将使用 Java 固定线程池方法创建具有指定线程数的线程池。要使用给定计算机上的所有可用核心请设置占位符值 AVAILABLE_CORES。record.processing.shutdown.timeout.ms1000调用任务关闭后等待处理已提交记录的最长时间以毫秒为单位。record.processing.orderORDERED确定应如何生成记录。ORDERED记录按顺序处理也就是说它们按从数据库获取的顺序生成。UNORDERED记录按非顺序处理也就是说它们可以按与源数据库不同的顺序生成。UNORDERED 选项的非顺序处理可实现更好的吞吐量因为记录在任何 SMT 处理和消息序列化完成后立即生成而无需等待其他记录。当向引擎提供 ChangeConsumer 方法时此选项不起作用。record.processing.with.serial.consumerfalse指定是否应从提供的 Consumer 创建默认的 ChangeConsumer从而导致串行 Consumer 处理。如果您在使用 API 创建引擎时指定了 ChangeConsumer 接口则此选项无效。task.management.timeout.ms180,000 (3 min)引擎等待任务生命周期管理操作启动和停止完成的时间以毫秒为单位。
数据库模式历史属性
一些连接器还需要一组额外的属性来配置数据库模式历史记录
MySQLSQL ServerOracleDb2
如果没有正确配置数据库模式历史记录则连接器将拒绝启动。默认配置需要可用的Kafka集群。对于其他部署可使用基于文件的数据库模式历史记录存储实现。
属性默认值描述schema.history.internal负责持久保存数据库模式历史的 Java 类的名称。schema.history.internal.file.filename存储数据库架构历史记录的文件的路径。schema.history.internal.kafka.topic存储数据库架构历史记录的 Kafka 主题。schema.history.internal.kafka.bootstrap.servers要连接的 Kafka 集群服务器的初始列表。集群提供用于存储数据库架构历史记录的主题。
处理故障
当引擎执行时其连接器会主动记录每个源记录中的源偏移并且引擎会定期将这些偏移刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时重新启动后引擎及其连接器将从最后记录的偏移处恢复读取源信息。
那么当嵌入式引擎正在运行时应用程序发生故障会发生什么结果是在重新启动后应用程序很可能会收到一些之前在崩溃之前已经处理过的源记录。这取决于引擎多久将偏移刷新到其存储中通过offset.flush.interval.ms属性以及特定连接器在一个批次中返回多少个源记录。最理想的情况是每次都刷新偏移量例如将offset.flush.interval.ms设置为0但即使这样嵌入式引擎仍然只会在从连接器接收到每个源记录批次后刷新偏移量。 例如MySQL连接器使用max.batch.size来指定批次中可能出现的源记录的最大数量。即使将offset.flush.interval.ms设置为0当应用程序在崩溃后重新启动时可能会看到最多n个重复记录其中n是批次的大小。如果将offset.flush.interval.ms属性设置得更高则应用程序可能会看到最多n * m个重复记录其中n是批次的最大大小m是在单个偏移刷新间隔期间可能累积的批次数。显然可以将嵌入式连接器配置为不进行批处理并始终刷新偏移量从而使应用程序永远不会接收到任何重复的源记录。但是这会大大增加开销并降低连接器的吞吐量。
总的来说当使用嵌入式连接器时应用程序在正常操作期间包括在正常关闭后重新启动将仅接收到每个源记录一次但在崩溃或不正确关闭后重新启动后需要容忍接收到重复事件。如果应用程序需要更严格的确切一次性行为那么应该使用完整的Debezium平台该平台可以提供确切一次性保证即使在崩溃和重新启动后。