如何看一个网站的好坏,死循环网站,wordpress如何换域名,如何保证网站安全一、上下文
《Kafka-Connect自带示例》中我们尝试了零配置启动producer和consumer去生产和消费数据#xff0c;那么它内部是如何实现的呢#xff1f;下面我们从源码来揭开它神秘的面纱。
二、入口类有哪些#xff1f;
从启动脚本#xff08;connect-standalone.sh#…一、上下文
《Kafka-Connect自带示例》中我们尝试了零配置启动producer和consumer去生产和消费数据那么它内部是如何实现的呢下面我们从源码来揭开它神秘的面纱。
二、入口类有哪些
从启动脚本connect-standalone.sh中我们可以获取到启动类为ConnectStandalone
# ......省略......exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectStandalone $
此外我们在参数中还给了source和sink的配置文件从这source文件中可以获取到入口类为FileStreamSource从sink文件中可以获取到入口类为FileStreamSink
FileStreamSource对应源码中的FileStreamSourceConnector
FileStreamSink对应源码中的FileStreamSinkConnector
connect-file-source.properties namelocal-file-source connector.classFileStreamSource tasks.max1 file/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt topicconnect-test connect-file-sink.properties namelocal-file-sink connector.classFileStreamSink tasks.max1 file/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/sink.txt topicsconnect-test 三、ConnectStandalone
它将Kafka Connect作为独立进程运行。在此模式下work连接器和任务不会被分配。相反所有正常的Connect机器都在一个过程中工作。适合一些临时、小型或实验性工作。
在此模式下连接器和任务配置存储在内存中不是持久的。但是连接器偏移数据是持久的因为它使用文件存储可通过offset.storage.file.filename配置
public class ConnectStandalone extends AbstractConnectCliStandaloneConfig {public static void main(String[] args) {ConnectStandalone connectStandalone new ConnectStandalone(args);//调用父类的run()connectStandalone.run();}}//这里用到了AbstractConnectCli它里面有Kafka Connect的通用初始化逻辑。
public abstract class AbstractConnectCliT extends WorkerConfig {//验证第一个CLI参数、进程工作器属性并启动Connectpublic void run() {if (args.length 1 || Arrays.asList(args).contains(--help)) {log.info(Usage: {}, usage());Exit.exit(1);}try {//connect-standalone.properties 配置文件String workerPropsFile args[0];//从connect-standalone.properties 中将配置的参数都转成 mapMapString, String workerProps !workerPropsFile.isEmpty() ?Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();//这里面存放了 connect-standalone.properties 后的其他参数//比如connect-file-source.properties、connect-file-sink.propertiesString[] extraArgs Arrays.copyOfRange(args, 1, args.length);//启动ConnectConnect connect startConnect(workerProps, extraArgs);// 关机将由Ctrl-C或通过HTTP关机请求触发connect.awaitStop();} catch (Throwable t) {log.error(Stopping due to error, t);Exit.exit(2);}}}
总结下来做了两件事情
1、初始化connect-standalone.properties配置信息
2、启动Connect
那什么是Connect且它有做了什么呢下面我们来看下
1、启动Connect
public abstract class AbstractConnectCliT extends WorkerConfig {public Connect startConnect(MapString, String workerProps, String... extraArgs) {//Kafka Connect工作进程初始化log.info(Kafka Connect worker initializing ...);long initStart time.hiResClockMs();WorkerInfo initInfo new WorkerInfo();initInfo.logAll();//正在扫描插件类。这可能需要一点时间log.info(Scanning for plugin classes. This might take a moment ...);Plugins plugins new Plugins(workerProps);plugins.compareAndSwapWithDelegatingLoader();T config createConfig(workerProps);log.debug(Kafka cluster ID: {}, config.kafkaClusterId());RestClient restClient new RestClient(config);ConnectRestServer restServer new ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());restServer.initializeServer();URI advertisedUrl restServer.advertisedUrl();String workerId advertisedUrl.getHost() : advertisedUrl.getPort();//connector.client.config.override.policy 默认值 All//ConnectorClientConfigOverridePolicy 实现的类名或别名。// 定义连接器可以覆盖哪些客户端配置。默认实现为“All”这意味着连接器配置可以覆盖所有客户端属性。// 框架中的其他可能策略包括“无”禁止连接器覆盖客户端属性“主体”允许连接器仅覆盖客户端主体ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy plugins.newPlugin(config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),config, ConnectorClientConfigOverridePolicy.class);Herder herder createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);final Connect connect new Connect(herder, restServer);//Kafka Connect工作进程初始化已完成log.info(Kafka Connect worker initialization took {}ms, time.hiResClockMs() - initStart);try {connect.start();} catch (Exception e) {log.error(Failed to start Connect, e);connect.stop();Exit.exit(3);}//这里面会依次处理 connect-file-source.properties、connect-file-sink.properties 等参数processExtraArgs(herder, connect, extraArgs);return connect;}}
可以理解connect-standalone.properties是用于启动connect的connect-file-source.properties是用于启动source任务的的connect-file-sink.properties是用于启动sink任务的source、sink的任务是在processExtraArgs(herder, connect, extraArgs)中完成的这里用到了三个参数
1、Herder牧民可用于在Connect群集上执行操作的实例它里面有Worker
2、Connect这个类将Kafka Connect进程的所有组件牧民、工人、存储、命令接口联系在一起管理它们的生命周期
3、extraArgs用于配置source、sink任务的参数
2、启动source、sink任务
这里我们对着参数connect-file-source.properties、connect-file-sink.properties来看更为形象 protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {try {//一个配置一个配置去解析//按照官方的例子会依次解析connect-file-source.properties、connect-file-sink.propertiesfor (final String connectorConfigFile : extraArgs) {CreateConnectorRequest createConnectorRequest parseConnectorConfigurationFile(connectorConfigFile);FutureCallbackHerder.CreatedConnectorInfo cb new FutureCallback((error, info) - {if (error ! null)log.error(Failed to create connector for {}, connectorConfigFile);elselog.info(Created connector {}, info.result().name());});//依次把每个配置文件的解析结果放入 herder 牧民中herder.putConnectorConfig(createConnectorRequest.name(), createConnectorRequest.config(),createConnectorRequest.initialTargetState(),false, cb);//Future的get方法是一个阻塞方法用于获取任务的运行结果。当调用get方法时如果任务尚未完成线程会阻塞直到任务完成。cb.get();}} catch (Throwable t) {//.....}}
下面我们分别用connect-file-source.properties、connect-file-sink.properties带入看看牧民herder是如何将任务进行执行的。一份文件就是一个connector这里我们先分析StandaloneHerder
public class StandaloneHerder extends AbstractHerder {private final ScheduledExecutorService requestExecutorService;StandaloneHerder(Worker worker,String workerId,String kafkaClusterId,StatusBackingStore statusBackingStore,MemoryConfigBackingStore configBackingStore,ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,Time time) {super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy, time);this.configState ClusterConfigState.EMPTY;//创建一个单线程执行器可以安排命令在给定延迟后运行或定期执行。this.requestExecutorService Executors.newSingleThreadScheduledExecutor();configBackingStore.setUpdateListener(new ConfigUpdateListener());}public void putConnectorConfig(final String connName, final MapString, String config, final TargetState targetState,final boolean allowReplace, final CallbackCreatedConnectorInfo callback) {try {validateConnectorConfig(config, (error, configInfos) - {if (error ! null) {callback.onCompletion(error, null);return;}//向执行器提交 source 或 sink 的 ConnectorConfigrequestExecutorService.submit(() - putConnectorConfig(connName, config, targetState, allowReplace, callback, configInfos));});} catch (Throwable t) {callback.onCompletion(t, null);}}private synchronized void putConnectorConfig(String connName,final MapString, String config,TargetState targetState,boolean allowReplace,final CallbackCreatedConnectorInfo callback,ConfigInfos configInfos) {try {//........//将此连接器配置以及可选的目标状态写入持久存储并等待其被确认//然后通过在Kafka日志中添加消费者来读回。如果将worker配置为使用fencable生产者写入配置topic//则必须在调用此方法之前成功调用claimWritePrivileges()configBackingStore.putConnectorConfig(connName, config, targetState);startConnector(connName, (error, result) - {if (error ! null) {callback.onCompletion(error, null);return;}//执行器提交任务requestExecutorService.submit(() - {updateConnectorTasks(connName);callback.onCompletion(null, new Created(created, createConnectorInfo(connName)));});});} catch (Throwable t) {callback.onCompletion(t, null);}}private synchronized void updateConnectorTasks(String connName) {//......//配置 connectorTask 这里会用到配置文件中的connector.class//既ListMapString, String newTaskConfigs recomputeTaskConfigs(connName);ListMapString, String rawTaskConfigs reverseTransform(connName, configState, newTaskConfigs);if (taskConfigsChanged(configState, connName, rawTaskConfigs)) {removeConnectorTasks(connName);configBackingStore.putTaskConfigs(connName, rawTaskConfigs);createConnectorTasks(connName);}}private void createConnectorTasks(String connName) {ListConnectorTaskId taskIds configState.tasks(connName);createConnectorTasks(connName, taskIds);}private void createConnectorTasks(String connName, CollectionConnectorTaskId taskIds) {MapString, String connConfigs configState.connectorConfig(connName);for (ConnectorTaskId taskId : taskIds) {//依次启动每个task配置的 source和sink taskstartTask(taskId, connConfigs);}}private boolean startTask(ConnectorTaskId taskId, MapString, String connProps) {switch (connectorType(connProps)) {case SINK:return worker.startSinkTask(taskId,configState,connProps,configState.taskConfig(taskId),this,configState.targetState(taskId.connector()));case SOURCE:return worker.startSourceTask(taskId,configState,connProps,configState.taskConfig(taskId),this,configState.targetState(taskId.connector()));default:throw new ConnectException(Failed to start task taskId since it is not a recognizable type (source or sink));}}}
牧民Herder会用Worker来启动配置的SouceTask和SinkTask最终他们调用的还是同一个方法只是任务构建器不同而已下面我们继续分析
Worker启动SouceTask public boolean startSourceTask(...) {return startTask(id, connProps, taskProps, configState, statusListener,new SourceTaskBuilder(id, configState, statusListener, initialState));}
Worker启动SinkTask public boolean startSinkTask(...) {return startTask(id, connProps, taskProps, configState, statusListener,new SinkTaskBuilder(id, configState, statusListener, initialState));}
下面我们共同来分析startTask(...) //线程池//Executors.newCachedThreadPool()private final ExecutorService executor;private boolean startTask(...){//......//从 connector.class 获取类进行加载String connType connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);ClassLoader connectorLoader plugins.connectorLoader(connType);//......final Class? extends Task taskClass taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);//对应的sourceTask 和 sinkTaskfinal Task task plugins.newTask(taskClass);//此处就是根据传的参数来workerTask 会加载不同的Task// SourceTaskBuilder --- SouceTask// SinkTaskBuilder --- SinkTaskworkerTask taskBuilder.withTask(task).withConnectorConfig(connConfig).withKeyConverter(keyConverter).withValueConverter(valueConverter).withHeaderConverter(headerConverter).withClassloader(connectorLoader).build();workerTask.initialize(taskConfig);WorkerTask?, ? existing tasks.putIfAbsent(id, workerTask);//我们继续往下分析看看 SourceTask 和 SinkTask 都是怎么执行的executor.submit(plugins.withClassLoader(connectorLoader, workerTask));if (workerTask instanceof WorkerSourceTask) {SourceTask 有一个单独 定时提交 offset 的 任务默认间隔为 1minsourceTaskOffsetCommitter.ifPresent(committer - committer.schedule(id, (WorkerSourceTask) workerTask));}return true;}FileStreamSource对应的Task为FileStreamSourceTask
FileStreamSink对应的Task为FileStreamSinkTask
Worker会将WorkerTask调起去生产和消费数据
3、调度运行WorkerTask
WorkerTask会提供Worker用于管理任务的基本方法。实现将用户指定的Task与Kafka相结合以创建数据流。且WorkerTask会放到线程池中进行调度下面我们看下它的run()
//以下只是主要代码
abstract class WorkerTaskT, R extends ConnectRecordR implements Runnable {public void run() {doRun();}private void doRun() throws InterruptedException {//会初始化 我们在connect-file-source.properties、connect-file-sink.properties中对Producer、Consumer的参数配置doStart();//真的开始// 用对应的SourceTask去读取数据并交由Producer去生产// 用Consumer接收数据交由 SinkTask去处理数据execute();}}
doStart() void doStart() {retryWithToleranceOperator.reporters(errorReportersSupplier.get());initializeAndStart();statusListener.onStartup(id);}
Source 和 Sink 会对initializeAndStart()有不同的实现
Source
这里用的是WorkerTask的子类AbstractWorkerSourceTask
public abstract class AbstractWorkerSourceTask extends WorkerTaskSourceRecord, SourceRecord {protected void initializeAndStart() {prepareToInitializeTask();offsetStore.start();//启动标记设置为 truestarted true;//使用指定的上下文对象初始化此SourceTask。task.initialize(sourceTaskContext);//启动任务。这应该处理任何配置解析和任务的一次性设置。//这里会实际调用 FileStreamSourceTask 或者我们指定的其他 SourceTask的 start()task.start(taskConfig);log.info({} Source task finished initialization and start, this);}}
public class FileStreamSourceTask extends SourceTask {public void start(MapString, String props) {AbstractConfig config new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);//namelocal-file-source//connector.classFileStreamSource//tasks.max1//file/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt//topicconnect-test// filefilename config.getString(FileStreamSourceConnector.FILE_CONFIG);if (filename null || filename.isEmpty()) {stream System.in;//跟踪stdin的偏移量没有意义streamOffset null;reader new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));}//topicconnect-testtopic config.getString(FileStreamSourceConnector.TOPIC_CONFIG);//batch.sizebatchSize config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);}}
Sink
这里用的是WorkerTask的子类WorkerSinkTask protected void initializeAndStart() {SinkConnectorConfig.validate(taskConfig);//订阅 topicif (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {ListString topics SinkConnectorConfig.parseTopicsList(taskConfig);consumer.subscribe(topics, new HandleRebalance());log.debug({} Initializing and starting task for topics {}, this, String.join(, , topics));} else {//topics.regex//根据正则设置的 要消费的 topicString topicsRegexStr taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);Pattern pattern Pattern.compile(topicsRegexStr);consumer.subscribe(pattern, new HandleRebalance());log.debug({} Initializing and starting task for topics regex {}, this, topicsRegexStr);}//初始化此任务的上下文。task.initialize(context);//启动任务。这应该处理任何配置解析和任务的一次性设置。//这里会真正的调用 FileStreamSinkTask 或者 其他我们配置的SinkTask 的 start()task.start(taskConfig);log.info({} Sink task finished initialization and start, this);}
public class FileStreamSinkTask extends SinkTask {public void start(MapString, String props) {AbstractConfig config new AbstractConfig(FileStreamSinkConnector.CONFIG_DEF, props);filename config.getString(FileStreamSinkConnector.FILE_CONFIG);if (filename null || filename.isEmpty()) {outputStream System.out;} else {try {//根据我们在配置文件中结果文件 创建输出流outputStream new PrintStream(Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND),false,StandardCharsets.UTF_8.name());} catch (IOException e) {throw new ConnectException(Couldnt find or create file filename for FileStreamSinkTask, e);}}}}
execute()
为了看的清晰这里我们只列举主要代码
Source
public abstract class AbstractWorkerSourceTask extends WorkerTaskSourceRecord, SourceRecord {public void execute() {while (!isStopping()) {//这里会调用 task.poll(); 也就是从文件读取数据toSend poll();//这里真的就会调用producer.send() 发送数据sendRecords()}}
}
Sink
class WorkerSinkTask extends WorkerTaskConsumerRecordbyte[], byte[], SinkRecord {public void execute() {while (!isStopping())iteration();}}protected void iteration() {poll(timeoutMs);}protected void poll(long timeoutMs) {//用consumer拉回数据 ConsumerRecordsbyte[], byte[] msgs pollConsumer(timeoutMs);//转化并交由 自己定义的 SinkTask 处理数据convertMessages(msgs);deliverMessages()}private ConsumerRecordsbyte[], byte[] pollConsumer(long timeoutMs) {ConsumerRecordsbyte[], byte[] msgs consumer.poll(Duration.ofMillis(timeoutMs));}
}