ps怎么排版规划设计网站,装修网单,建设银行信用卡网站是多少钱,网站设置二级域名系列文章目录
一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录TaskGroupContainer初始…系列文章目录
一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录TaskGroupContainer初始化start方法详细步骤1、初始化task执行相关的状态信息2、开始执行任务while (true)循环 TaskGroupContainer源码 TaskGroupContainer
DataX的TaskGroupContainer是JobContainer将所有task分配到TaskGroup中执行的一个容器。这个容器的主要入口是start方法该方法会执行两个主要任务 初始化task执行相关的状态信息和 循环检测所有任务的执行状态 。此外TaskGroupContainer还有一个名为reportTaskGroupCommunication的方法用于向容器汇报状态。这个方法会收集当前TaskGroupContainer对应所有Task的通信信息并将其合并成一个通信信息。 初始化
设置配置文件Configuration 初始化监控 设置jobId 设置taskGroupId 设置channel类实例channelClazz
start方法详细步骤
1、初始化task执行相关的状态信息
taskConfigMaptaskId与其对应的Congifuration的map映射集合待运行的任务队列taskQueue运行失败任务taskFailedExecutorMap正在执行的任务集合runTasks任务开始时间taskStartTimeMap。
2、开始执行任务while (true)循环
1.判断task状态 循环遍历所有任务如果任务尚未完成跳过。如果任务已经完成从任务列表中删除。如果任务失败判断是否支持重试如支持重试并重试次数没有超过最大限制则重试执行。 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误 3.有任务未执行且正在运行的任务数小于最大通道限制创建TaskExecutor实例调用doStart真正执行数据同步任务从待运行task列表中删除同时加入到正在运行的队列。TaskExecutor构建的时候生成一个reader、channel和writer并启动两个线程reader生产数据写入channelwriter从channel中读数据任务执行完毕时通过wirter将任务状态置为成功。 4.检查执行队列和所有的任务状态如果所有的任务都执行成功则汇报taskGroup的状态并从循环中退出。 5.如果当前时间已经超出汇报时间的interval那么我们需要马上汇报 6.当所有的执行完成从while中退出之后再次全局汇报当前的任务状态
TaskGroupContainer源码
/**
* task任务运行容器
**/
public class TaskGroupContainer extends AbstractContainer {private static final Logger LOG LoggerFactory.getLogger(TaskGroupContainer.class);/*** 当前taskGroup所属jobId*/private long jobId;/*** 当前taskGroupId*/private int taskGroupId;/*** 使用的channel类*/private String channelClazz;/*** task收集器使用的类*/private String taskCollectorClass;private TaskMonitor taskMonitor TaskMonitor.getInstance();public TaskGroupContainer(Configuration configuration) {super(configuration);initCommunicator(configuration);this.jobId this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);this.taskGroupId this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);this.channelClazz this.configuration.getString(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);this.taskCollectorClass this.configuration.getString(CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);}private void initCommunicator(Configuration configuration) {super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));}public long getJobId() {return jobId;}public int getTaskGroupId() {return taskGroupId;}Overridepublic void start() {try {/*** 状态check时间间隔较短可以把任务及时分发到对应channel中*/int sleepIntervalInMillSec this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);/*** 状态汇报时间间隔稍长避免大量汇报*/long reportIntervalInMillSec this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,10000);/*** 2分钟汇报一次性能统计*/// 获取channel数目int channelNumber this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);int taskMaxRetryTimes this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);long taskRetryIntervalInMsec this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);long taskMaxWaitInMsec this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);ListConfiguration taskConfigs this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);if(LOG.isDebugEnabled()) {LOG.debug(taskGroup[{}]s task configs[{}], this.taskGroupId,JSON.toJSONString(taskConfigs));}int taskCountInThisTaskGroup taskConfigs.size();LOG.info(String.format(taskGroupId[%d] start [%d] channels for [%d] tasks.,this.taskGroupId, channelNumber, taskCountInThisTaskGroup));this.containerCommunicator.registerCommunication(taskConfigs);MapInteger, Configuration taskConfigMap buildTaskConfigMap(taskConfigs); //taskId与task配置ListConfiguration taskQueue buildRemainTasks(taskConfigs); //待运行task列表MapInteger, TaskExecutor taskFailedExecutorMap new HashMapInteger, TaskExecutor(); //taskId与上次失败实例ListTaskExecutor runTasks new ArrayListTaskExecutor(channelNumber); //正在运行taskMapInteger, Long taskStartTimeMap new HashMapInteger, Long(); //任务开始时间long lastReportTimeStamp 0;Communication lastTaskGroupContainerCommunication new Communication();while (true) {//1.判断task状态boolean failedOrKilled false;MapInteger, Communication communicationMap containerCommunicator.getCommunicationMap();for(Map.EntryInteger, Communication entry : communicationMap.entrySet()){Integer taskId entry.getKey();Communication taskCommunication entry.getValue();if(!taskCommunication.isFinished()){continue;}TaskExecutor taskExecutor removeTask(runTasks, taskId);//上面从runTasks里移除了因此对应在monitor里移除taskMonitor.removeTask(taskId);//失败看task是否支持failover重试次数未超过最大限制if(taskCommunication.getState() State.FAILED){taskFailedExecutorMap.put(taskId, taskExecutor);if(taskExecutor.supportFailOver() taskExecutor.getAttemptCount() taskMaxRetryTimes){taskExecutor.shutdown(); //关闭老的executorcontainerCommunicator.resetCommunication(taskId); //将task的状态重置Configuration taskConfig taskConfigMap.get(taskId);taskQueue.add(taskConfig); //重新加入任务列表}else{failedOrKilled true;break;}}else if(taskCommunication.getState() State.KILLED){failedOrKilled true;break;}else if(taskCommunication.getState() State.SUCCEEDED){Long taskStartTime taskStartTimeMap.get(taskId);if(taskStartTime ! null){Long usedTime System.currentTimeMillis() - taskStartTime;LOG.info(taskGroup[{}] taskId[{}] is successed, used[{}]ms,this.taskGroupId, taskId, usedTime);//usedTime*1000*1000 转换成PerfRecord记录的ns这里主要是简单登记进行最长任务的打印。因此增加特定静态方法PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);taskStartTimeMap.remove(taskId);taskConfigMap.remove(taskId);}}}// 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误if (failedOrKilled) {lastTaskGroupContainerCommunication reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());}//3.有任务未执行且正在运行的任务数小于最大通道限制IteratorConfiguration iterator taskQueue.iterator();while(iterator.hasNext() runTasks.size() channelNumber){Configuration taskConfig iterator.next();Integer taskId taskConfig.getInt(CoreConstant.TASK_ID);int attemptCount 1;TaskExecutor lastExecutor taskFailedExecutorMap.get(taskId);if(lastExecutor!null){attemptCount lastExecutor.getAttemptCount() 1;long now System.currentTimeMillis();long failedTime lastExecutor.getTimeStamp();if(now - failedTime taskRetryIntervalInMsec){ //未到等待时间继续留在队列continue;}if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束if(now - failedTime taskMaxWaitInMsec){markCommunicationFailed(taskId);reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, task failover等待超时);}else{lastExecutor.shutdown(); //再次尝试关闭continue;}}else{LOG.info(taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown,this.taskGroupId, taskId, lastExecutor.getAttemptCount());}}Configuration taskConfigForRun taskMaxRetryTimes 1 ? taskConfig.clone() : taskConfig;TaskExecutor taskExecutor new TaskExecutor(taskConfigForRun, attemptCount);taskStartTimeMap.put(taskId, System.currentTimeMillis());taskExecutor.doStart();iterator.remove();runTasks.add(taskExecutor);//上面增加task到runTasks列表因此在monitor里注册。taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));taskFailedExecutorMap.remove(taskId);LOG.info(taskGroup[{}] taskId[{}] attemptCount[{}] is started,this.taskGroupId, taskId, attemptCount);}//4.任务列表为空executor已结束, 搜集状态为success---成功if (taskQueue.isEmpty() isAllTaskDone(runTasks) containerCommunicator.collectState() State.SUCCEEDED) {// 成功的情况下也需要汇报一次。否则在任务结束非常快的情况下采集的信息将会不准确lastTaskGroupContainerCommunication reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);LOG.info(taskGroup[{}] completed its tasks., this.taskGroupId);break;}// 5.如果当前时间已经超出汇报时间的interval那么我们需要马上汇报long now System.currentTimeMillis();if (now - lastReportTimeStamp reportIntervalInMillSec) {lastTaskGroupContainerCommunication reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);lastReportTimeStamp now;//taskMonitor对于正在运行的task每reportIntervalInMillSec进行检查for(TaskExecutor taskExecutor:runTasks){taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));}}Thread.sleep(sleepIntervalInMillSec);}//6.最后还要汇报一次reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);} catch (Throwable e) {Communication nowTaskGroupContainerCommunication this.containerCommunicator.collect();if (nowTaskGroupContainerCommunication.getThrowable() null) {nowTaskGroupContainerCommunication.setThrowable(e);}nowTaskGroupContainerCommunication.setState(State.FAILED);this.containerCommunicator.report(nowTaskGroupContainerCommunication);throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);}finally {if(!PerfTrace.getInstance().isJob()){//最后打印cpu的平均消耗GC的统计VMInfo vmInfo VMInfo.getVmInfo();if (vmInfo ! null) {vmInfo.getDelta(false);LOG.info(vmInfo.totalString());}LOG.info(PerfTrace.getInstance().summarizeNoException());}}}private MapInteger, Configuration buildTaskConfigMap(ListConfiguration configurations){MapInteger, Configuration map new HashMapInteger, Configuration();for(Configuration taskConfig : configurations){int taskId taskConfig.getInt(CoreConstant.TASK_ID);map.put(taskId, taskConfig);}return map;}private ListConfiguration buildRemainTasks(ListConfiguration configurations){ListConfiguration remainTasks new LinkedListConfiguration();for(Configuration taskConfig : configurations){remainTasks.add(taskConfig);}return remainTasks;}private TaskExecutor removeTask(ListTaskExecutor taskList, int taskId){IteratorTaskExecutor iterator taskList.iterator();while(iterator.hasNext()){TaskExecutor taskExecutor iterator.next();if(taskExecutor.getTaskId() taskId){iterator.remove();return taskExecutor;}}return null;}private boolean isAllTaskDone(ListTaskExecutor taskList){for(TaskExecutor taskExecutor : taskList){if(!taskExecutor.isTaskFinished()){return false;}}return true;}private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){Communication nowTaskGroupContainerCommunication this.containerCommunicator.collect();nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());Communication reportCommunication CommunicationTool.getReportCommunication(nowTaskGroupContainerCommunication,lastTaskGroupContainerCommunication, taskCount);this.containerCommunicator.report(reportCommunication);return reportCommunication;}private void markCommunicationFailed(Integer taskId){Communication communication containerCommunicator.getCommunication(taskId);communication.setState(State.FAILED);}/*** TaskExecutor是一个完整task的执行器* 其中包括11的reader和writer*/class TaskExecutor {private Configuration taskConfig;private int taskId;private int attemptCount;private Channel channel;private Thread readerThread;private Thread writerThread;private ReaderRunner readerRunner;private WriterRunner writerRunner;/*** 该处的taskCommunication在多处用到* 1. channel* 2. readerRunner和writerRunner* 3. reader和writer的taskPluginCollector*/private Communication taskCommunication;public TaskExecutor(Configuration taskConf, int attemptCount) {// 获取该taskExecutor的配置this.taskConfig taskConf;Validate.isTrue(null ! this.taskConfig.getConfiguration(CoreConstant.JOB_READER) null ! this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER),[reader|writer]的插件参数不能为空!);// 得到taskIdthis.taskId this.taskConfig.getInt(CoreConstant.TASK_ID);this.attemptCount attemptCount;/*** 由taskId得到该taskExecutor的Communication* 要传给readerRunner和writerRunner同时要传给channel作统计用*/this.taskCommunication containerCommunicator.getCommunication(taskId);Validate.notNull(this.taskCommunication,String.format(taskId[%d]的Communication没有注册过, taskId));this.channel ClassUtil.instantiate(channelClazz,Channel.class, configuration);this.channel.setCommunication(this.taskCommunication);/*** 获取transformer的参数*/ListTransformerExecution transformerInfoExecs TransformerUtil.buildTransformerInfo(taskConfig);/*** 生成writerThread*/writerRunner (WriterRunner) generateRunner(PluginType.WRITER);this.writerThread new Thread(writerRunner,String.format(%d-%d-%d-writer,jobId, taskGroupId, this.taskId));//通过设置thread的contextClassLoader即可实现同步和主程序不通的加载器this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)));/*** 生成readerThread*/readerRunner (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);this.readerThread new Thread(readerRunner,String.format(%d-%d-%d-reader,jobId, taskGroupId, this.taskId));/*** 通过设置thread的contextClassLoader即可实现同步和主程序不通的加载器*/this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.taskConfig.getString(CoreConstant.JOB_READER_NAME)));}public void doStart() {this.writerThread.start();// reader没有起来writer不可能结束if (!this.writerThread.isAlive() || this.taskCommunication.getState() State.FAILED) {throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,this.taskCommunication.getThrowable());}this.readerThread.start();// 这里reader可能很快结束if (!this.readerThread.isAlive() this.taskCommunication.getState() State.FAILED) {// 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,this.taskCommunication.getThrowable());}}private AbstractRunner generateRunner(PluginType pluginType) {return generateRunner(pluginType, null);}private AbstractRunner generateRunner(PluginType pluginType, ListTransformerExecution transformerInfoExecs) {AbstractRunner newRunner null;TaskPluginCollector pluginCollector;switch (pluginType) {case READER:newRunner LoadUtil.loadPluginRunner(pluginType,this.taskConfig.getString(CoreConstant.JOB_READER_NAME));newRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_READER_PARAMETER));pluginCollector ClassUtil.instantiate(taskCollectorClass, AbstractTaskPluginCollector.class,configuration, this.taskCommunication,PluginType.READER);RecordSender recordSender;if (transformerInfoExecs ! null transformerInfoExecs.size() 0) {recordSender new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);} else {recordSender new BufferedRecordExchanger(this.channel, pluginCollector);}((ReaderRunner) newRunner).setRecordSender(recordSender);/*** 设置taskPlugin的collector用来处理脏数据和job/task通信*/newRunner.setTaskPluginCollector(pluginCollector);break;case WRITER:newRunner LoadUtil.loadPluginRunner(pluginType,this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));newRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));pluginCollector ClassUtil.instantiate(taskCollectorClass, AbstractTaskPluginCollector.class,configuration, this.taskCommunication,PluginType.WRITER);((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(this.channel, pluginCollector));/*** 设置taskPlugin的collector用来处理脏数据和job/task通信*/newRunner.setTaskPluginCollector(pluginCollector);break;default:throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, Cant generateRunner for: pluginType);}newRunner.setTaskGroupId(taskGroupId);newRunner.setTaskId(this.taskId);newRunner.setRunnerCommunication(this.taskCommunication);return newRunner;}// 检查任务是否结束private boolean isTaskFinished() {// 如果reader 或 writer没有完成工作那么直接返回工作没有完成if (readerThread.isAlive() || writerThread.isAlive()) {return false;}if(taskCommunicationnull || !taskCommunication.isFinished()){return false;}return true;}private int getTaskId(){return taskId;}private long getTimeStamp(){return taskCommunication.getTimestamp();}private int getAttemptCount(){return attemptCount;}private boolean supportFailOver(){return writerRunner.supportFailOver();}private void shutdown(){writerRunner.shutdown();readerRunner.shutdown();if(writerThread.isAlive()){writerThread.interrupt();}if(readerThread.isAlive()){readerThread.interrupt();}}private boolean isShutdown(){return !readerThread.isAlive() !writerThread.isAlive();}}
}