当今做哪个网站致富,开源php源码,wordpress 主题 响应式,深圳瑞捷成立新公司xxljob可以对定时任务进行调度#xff0c;现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口#xff0c;spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程#xff1a;
admin服务端初始化
JobTriggerPoolHelper.java#toStart()方法…xxljob可以对定时任务进行调度现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程
admin服务端初始化
JobTriggerPoolHelper.java#toStart()方法中会初始化两个调用任务的线程池快线程池最大线程数为200慢线程池最大线程数为100。然后启动线程定时轮询需要调度的定时任务。首先计算每秒能处理的定时任务数量公式为(快线程池的最大线程数满线程池的最大线程数)*20(1000ms/每个任务处理的时长50ms)最多为6000。从数据库中加锁查出任务触发时间当前时间预读时间(5s)的任务然后分情况处理。
当前时间大于任务触发时间预读时间即任务触发时间已经过期超过5s此时不做任何处理只刷新任务下次触发时间当前时间大于任务触发时间但不超过5s即任务虽然过期但是过期时间不到5s此时触发任务将任务数据保存到ringDataprivate volatile static MapInteger, ListInteger ringData new ConcurrentHashMap();ringData的key是秒数value是jobid然后刷新任务的下次触发时间当前时间小于任务触发时间即还没到任务的触发时间此时也会将任务写道ringData中等到期就会进行处理因为在内存中查询任务比到数据库查询要快很多。
int preReadCount (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {boolean preReadSuc true;try {preparedStatement conn.prepareStatement( select * from xxl_job_lock where lock_name schedule_lock for update );preparedStatement.execute();// 1、pre readlong nowTime System.currentTimeMillis();ListXxlJobInfo scheduleList XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime PRE_READ_MS, preReadCount);if (scheduleList!null scheduleList.size()0) {for (XxlJobInfo jobInfo: scheduleList) {if (nowTime jobInfo.getTriggerNextTime() PRE_READ_MS) {refreshNextValidTime(jobInfo, new Date());} else if (nowTime jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()1 nowTime PRE_READ_MS jobInfo.getTriggerNextTime()) {int ringSecond (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {int ringSecond (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}最后判断任务调度状态,TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);有任务需要调度则下一秒继续扫描如果没有发现任务则睡眠5s(PRE_READ_MS)。 刚才说到待执行的任务会加入ringData现在往下看怎么处理ringData的。这里会回退一秒因为可能出现任务超时的情况导致任务处理时遗漏。处理的逻辑很简单到了某秒时根据秒数取出对应的jobid集合然后依次处理触发每个任务即可。触发任务的逻辑我们稍微再说。 ListInteger ringItemData new ArrayList();int nowSecond Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长跨过刻度向前校验一个刻度for (int i 0; i 2; i) {ListInteger tmpData ringData.remove( (nowSecond60-i)%60 );if (tmpData ! null) {ringItemData.addAll(tmpData);}}if (ringItemData.size() 0) {for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);}ringItemData.clear();}客户端初始化
客户端创建定时任务只要在bean中添加XxlJob注解即可调度任务是通过XxlJobSpringExecutor实现的。过程是到spring容器中获取所有bean找出对方法使用了XxlJob的bean然后使用MethodJobHandler进行封装注册到jobHandlerRepositoryprivate static ConcurrentMapString, IJobHandler jobHandlerRepository new ConcurrentHashMapString, IJobHandler();。 String[] beanDefinitionNames applicationContext.getBeanDefinitionNames();for (String beanDefinitionName : beanDefinitionNames) {Object bean applicationContext.getBean(beanDefinitionName);Method[] methods bean.getClass().getDeclaredMethods();for (Method method: methods) {XxlJob xxlJob AnnotationUtils.findAnnotation(method, XxlJob.class);if (xxlJob ! null) {String name xxlJob.value();method.setAccessible(true);if(xxlJob.init().trim().length() 0) {initMethod bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);}if(xxlJob.destroy().trim().length() 0) {destroyMethod bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);}registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}}客户端会启动一个netty服务器xxl-job底层的核心就是netty监听${xxl.job.executor.port}配置的端口等待来自服务端的调度。 ServerBootstrap bootstrap new ServerBootstrap();((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializerSocketChannel() {public void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();NettyHttpServer.logger.info( xxl-rpc remoting server start success, nettype {}, port {}, NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());NettyHttpServer.this.onStarted();future.channel().closeFuture().sync();服务端触发任务
触发任务是从JobTriggerPoolHelper.java#addTrigger()中开始的。默认是快线程池触发如果1min内执行时间超过500ms的次数大于10则改为满线程池。 public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {ThreadPoolExecutor triggerPool_ fastTriggerPool;AtomicInteger jobTimeoutCount jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!null jobTimeoutCount.get() 10) { // job-timeout 10 times in 1 mintriggerPool_ slowTriggerPool;}triggerPool_.execute(new Runnable() {Overridepublic void run() {long start System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {long minTim_now System.currentTimeMillis()/60000;if (minTim ! minTim_now) {minTim minTim_now;jobTimeoutCountMap.clear();}long cost System.currentTimeMillis()-start;if (cost 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount ! null) {timeoutCount.incrementAndGet();}真正执行是在processTrigger()方法中先根据调度策略获取处理任务的客户端地址默认是轮询策略。先获取任务id然后找到任务对应的客户端索引通过nextInt()方法找到下个索引再到客户端地址列表中根据索引获取地址。 private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){XxlJobLog jobLog new XxlJobLog();XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);TriggerParam triggerParam new TriggerParam();String address null;routeAddressResult executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() ReturnT.SUCCESS_CODE) {address routeAddressResult.getContent();}triggerResult runExecutor(triggerParam, address);
//轮询策略调度任务private static int count(int jobId) {// cache clearif (System.currentTimeMillis() CACHE_VALID_TIME) {routeCountEachJob.clear();CACHE_VALID_TIME System.currentTimeMillis() 1000*60*60*24;}// countInteger count routeCountEachJob.get(jobId);count (countnull || count1000000)?(new Random().nextInt(100)):count; // 初始化时主动Random一次缓解首次压力routeCountEachJob.put(jobId, count);return count;}Overridepublic ReturnTString route(TriggerParam triggerParam, ListString addressList) {String address addressList.get(count(triggerParam.getJobId())%addressList.size());return new ReturnTString(address);} 处理任务时通过proxy进行动态代理在XxlRpcReferenceBean.class#getObject为调度的定时任务生成了动态代理对象在InvocationHandler的invoke()方法中实现了逻辑增强最终到NettyHttpClient#asyncSend()将消息发送到客户端netty服务器。
客户端执行定时任务
客户端是在NettyHttpServerHandler#channelRead0()中处理定时任务的先对服务器的字节流进行反序列化在XxlRpcProviderFactory.class#invokeService()以反射方式远程调用ExecutorBizImpl.java#run()方法。 Class? serviceClass serviceBean.getClass();String methodName xxlRpcRequest.getMethodName();Class?[] parameterTypes xxlRpcRequest.getParameterTypes();Object[] parameters xxlRpcRequest.getParameters();Method method serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);在run方法中启动处理任务的JobThread进行处理JobThread中就是根据定时任务名获取对应的MethodJobHandler取出要执行的Method再反射执行即可。 IJobHandler newJobHandler XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread null) {jobThread XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}ReturnTString pushResult jobThread.pushTriggerQueue(triggerParam);总结下xxl-job首先在服务端启动线程轮询要执行的定时任务计算定时任务的触发时间然后后获取代理对象将要执行的任务信息通过netty发送到客户端客户端以反射方式执行定时任务。有不对的地方请大神指出欢迎大家一起讨论交流共同进步。