当前位置: 首页 > news >正文

山东省建设执业资格注册管理中心网站网站app开发建设

山东省建设执业资格注册管理中心网站,网站app开发建设,给wordpress程序提速,网站收录了文章不收录这是一篇介绍PowerJob#xff0c;Server端Actor的文章#xff0c;如果感兴趣可以请点个关注#xff0c;大家互相交流一下吧。 server端一共有两个Actor#xff0c;一个是处理worker传过来的信息#xff0c;一个是server之间的信息传递。 处理Worker的Actor叫做WorkerRequ… 这是一篇介绍PowerJobServer端Actor的文章如果感兴趣可以请点个关注大家互相交流一下吧。 server端一共有两个Actor一个是处理worker传过来的信息一个是server之间的信息传递。 处理Worker的Actor叫做WorkerRequestAkkaHandler处理Server之间的叫做FriendRequestHandler。从名字来看也是非常的有意思server之间彼此是朋友worker之间没有什么朋友有的只是上下级说跑偏了。 WorkerRequestAkkaHandler 主要接收五种类型的是消息 来自worker的心跳信息确保worker是活着的 任务实例的状态信息查看worker的工作进展 worker的日志信息监视worker是工作每一步是否有错误 worker部署容器的信息worker额外做了哪些工作 查询执行器集群的信息来新员工要第一时间知道 心跳信息的发送与接收 timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS); 心跳的发送是由worker端的WorkerHealthReporter的run方法发送的该类实现了Runnable接口在worker启动的时候被设置成了每隔15秒执行一次是worker的后台执行的程序。 心跳的接收是由server端的WorkerRequestAkkaHandler接收之后将信息存入到内存中顺便记录日志可以自行接入到ELK系统中去如果连接到ELK。 这一步操作的作用就是确认worker都活着当有任务来临的时候将任务发送到所有活着的或者发送到状态更好的worker去执行。 任务实例的状态信息 发送方主要是TaskTracker因为TaskTracker是一个抽象类所以有两个实现类一个是FrequentTaskTracker主要负责是秒级任务一个是CommonTaskTracker主要负责管理JobInstance的运行负责任务派发这三个类均会发送任务实例的状态信息抽象类TaskTracker主要是在创建任务的时候如果发生异常就会向server发送发生异常的任务实例的状态信息源代码如下 public static TaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {try {... ...} catch (Exception e) {// 直接发送失败请求TaskTrackerReportInstanceStatusReq response new TaskTrackerReportInstanceStatusReq();//这就是一堆set信息没什么好看的...String serverPath AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());ActorSelection serverActor workerRuntime.getActorSystem().actorSelection(serverPath);serverActor.tell(response, null);}return null; } FrequentTaskTracker主要是在Check内部类里面的的reportStatus方法执行是一个定时执行的方法。 CommonTaskTracker也是在一个内部类StatusCheckRunnable里面的innerRun方法执行主要是检查当前任务的执行状态每隔13秒回执行一次这个时间可以在启动java的时候设置。 接收端是server的WorkerRequestAkkaHandler类接收到信息之后更新任务状态主要代码是InstanceManager的updateStatus方法。源代码如下为了篇幅不太长有些日志输出给省略了大部分都是源代码的注释说明感觉说的挺详细还不乏幽默感所以就保留了。 public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {Long instanceId req.getInstanceId();// 获取相关数据JobInfoDO jobInfo instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId());InstanceInfoDO instanceInfo instanceInfoRepository.findByInstanceId(instanceId);if (instanceInfo null) {return;}// 丢弃过期的上报数据if (req.getReportTime() instanceInfo.getLastReportTime()) {return;}// 丢弃非目标 TaskTracker 的上报数据脑裂情况if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {return;}InstanceStatus receivedInstanceStatus InstanceStatus.of(req.getInstanceStatus());Integer timeExpressionType jobInfo.getTimeExpressionType();// 更新 最后上报时间 和 修改时间instanceInfo.setLastReportTime(req.getReportTime());instanceInfo.setGmtModified(new Date());// 下面这个IF主要是处理FrequentTaskTracker发来的消息// FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可// FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行// 综上直接把 status 和 runningNum 同步到DB即可if (TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType)) {// 如果实例处于失败状态则说明该 worker 失联了一段时间被 server 判定为宕机而此时该秒级任务有可能已经重新派发了故需要 Kill 掉该实例if (instanceInfo.getStatus() InstanceStatus.FAILED.getV()) {stopInstance(instanceId, instanceInfo);return;}LifeCycle lifeCycle LifeCycle.parse(jobInfo.getLifecycle());// 检查生命周期是否已结束if (lifeCycle.getEnd() ! null lifeCycle.getEnd() System.currentTimeMillis()) {stopInstance(instanceId, instanceInfo);instanceInfo.setStatus(InstanceStatus.SUCCEED.getV());} else {instanceInfo.setStatus(receivedInstanceStatus.getV());}instanceInfo.setResult(req.getResult());instanceInfo.setRunningTimes(req.getTotalTaskNum());instanceInfoRepository.saveAndFlush(instanceInfo);// 任务需要告警if (req.isNeedAlert()) {alert(instanceId, req.getAlertContent());}return;}// 更新运行次数if (instanceInfo.getStatus() InstanceStatus.WAITING_WORKER_RECEIVE.getV()) {// 这里不会存在并发问题instanceInfo.setRunningTimes(instanceInfo.getRunningTimes() 1);}// QAQ 不能提前变更 status否则会导致更新运行次数的逻辑不生效继而导致普通任务 无限重试instanceInfo.setStatus(receivedInstanceStatus.getV());boolean finished false;if (receivedInstanceStatus InstanceStatus.SUCCEED) {instanceInfo.setResult(req.getResult());instanceInfo.setFinishedTime(System.currentTimeMillis());finished true;} else if (receivedInstanceStatus InstanceStatus.FAILED) {// 当前重试次数 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 if (instanceInfo.getRunningTimes() jobInfo.getInstanceRetryNum()) {// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败instanceInfo.setExpectedTriggerTime(System.currentTimeMillis() 10000);// 修改状态为 等待派发正式开始重试// 问题会丢失以往的调度记录actualTriggerTime什么的都会被覆盖instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());} else {instanceInfo.setResult(req.getResult());instanceInfo.setFinishedTime(System.currentTimeMillis());finished true;}}// 同步状态变更信息到数据库instanceInfoRepository.saveAndFlush(instanceInfo);if (finished) {// 这里的 InstanceStatus 只有 成功/失败 两种手动停止不会由 TaskTracker 上报processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());}} 所谓脑裂问题就是同一个集群中的不同节点对于集群的状态有了不一样的理解 worker的日志信息 timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS); 发送方式Worker中的OmsLogHandler类里的LogSubmitter内部类的run方法也是另起线程进行处理的将产生的日记内容进行上传这里面使用了一个锁保证只有一个线程上传日志。 接收端是server的WorkerRequestAkkaHandler类接收之后保存到数据库中。 worker部署容器的信息 发送端是Worker的OmsContainerFactory类中的fetchContainer方法该方法是由WorkActor触发的当server要部署容器的时候会向WorkerActor接收然后调用方法onReceiveServerDeployContainerRequest方法中判断该容器是否已经保存在本地如果没有再通过fetchContainer向server的WorkerRequestAkkaHandler发送请求获取容器信息然后部署。 接收端是server的WorkerRequestAkkaHandler类,接收到信息之后server会将容器idnameversion和下载的url发回给worker让worker通过url下载容器进行部署。 查询执行器集群的信息 发送端是worker的TaskTracker类的内部类WorkerDetector的run方法如果是秒级任务在任务初始化的时候会设置成每一分钟执行一次在FrequentTaskTracker的initTaskTracker方法内 scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); 如果是正常的任务任务类型是Map或者MapReduce会执行该方法 if (executeType ExecuteType.MAP || executeType ExecuteType.MAP_REDUCE) { scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); } 接收端是server的WorkerRequestAkkaHandler类接收之后将所有可以使用的worker的信息返回。 FriendRequestHandler 主要接收两种类型的信息 Ping 检测目标机器是否存活还有和我一个级别的活人吗 RemoteProcessReq 远程执行命令告诉你的直接下属干活我不想得罪人 检测目标机器是否存活 发送方式server的ServerElectionService类的activeAddress方法该方法是worker启动的时候连接server时调用acquire服务的时候会调用该方法该方法会向worker发送的server地址询问目前存活的所有server地址信息。 触发的起点是在Worker的PowerJobWorker的init()中 serverDiscoveryService.start(timingPool); 》ServerDiscoveryService的start方法的this.currentServerAddress discovery(); 》ServerDiscoveryService的discovery方法的result acquire(这个地址不重要重要的是调用了这个方法); 》ServerDiscoveryService的acquire方法的result CommonUtils.executeWithRetry0(() - HttpUtils.get(url)); 然后转到了Server的ServerController类的acquireServer方法中 return ResultDTO.success(serverElectionService.elect(appId, protocol, currentServer)); 》ServerElectionService的elect方法的return getServer0(appId, protocol); 》ServerElectionService的getServer0方法的String activeAddress activeAddress(originServer, downServerCache, protocol); 》ServerElectionService的activeAddress方法的CompletionStageObject askCS Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); 以上就是调用前的全部步骤了。 接收方是server的FriendRequestHandler返回给询问方目前所有存活的server地址。 远程执行命令 发送方式server中JobServer上的注解DesignateServer的切面方法在server执行某个任务时会对当前worker的直属server进行判断如果worker的直属server是当前调度任务的server则直接执行如果不是则会将该方法发送给直属server进行执行。 比如说立即执行任务的命令会在JobService的runJob中执行但是该方法上有一个注解DesignateServer这也就会在方法执行之前调用DesignateServerAspect的execute方法如果将目标server地址与本地地址进行对比不一样则会执行该远程方法。 接收方是server的FriendRequestHandler接到执行方法的类名方法名入参和返回值等信息执行方法。执行方法是在RemoteRequestProcessor类中。 总结 server的这两个Actor职责划分还是很清晰的不过感觉将Actor仅仅只是用在通信上有点大材小用的感觉Actor这个单词本身就是将其比作一个演员应该是扮演某个角色当然了让其仅仅扮演一个手机可能也是个不错的想法。Akka-remote的底层是Netty如果直接使用Netty估计也可以只不过Akka将其进行了封装使用起来能够更方便一些不过就是给人一种用大炮打蚊子的感觉。
http://www.dnsts.com.cn/news/44705.html

相关文章:

  • 工信部网站备案查通知网站建设公司 资讯
  • 网站停留时间 从哪里获取网站开发项目报告
  • 网站优化怎么做分录品牌型网站的作用
  • 网站建设腾讯云石家庄seo网站优化
  • 有没有关于网站开发的名人访谈广州建设交易中心网站
  • 电商网站销售数据分析wordpress月份归档要收录吗
  • 源代码网站培训指定网站怎么设置路由器只访问
  • 深圳网站建设的特殊性做微分销系统多少钱
  • 网站策划的基本过程丽江市建设局官方网站
  • 专业整站优化wordpress外观选单分类添加不
  • 成都网站建设单招网运城seo
  • 做go kegg的网站四川省乐山市建设银行网站
  • 和网站签约新闻网站搭建平台demo免费
  • 建设网站的服务端口wordpress资源占用
  • 福州服务专业公司网站建设安全生产标准化建设网站
  • 什么叫网站外链天津百度推广代理商
  • 建设银行网站查询房贷信息百度网站电话是多少
  • 网站开发工程师待遇淄博做网站公司融资多少
  • 生成短链接的网站做不做生意都要知道的网站
  • 西安做网站收费价格手机做服务器建网站
  • 网站开发项目私活网站没有后台怎么更新文章
  • 与狗狗做网站长治市住房保障和城乡建设管理局网站
  • 网站怎么做 流程vue怎么做网页
  • 网站推荐广告模板宁乡县住房和城乡建设局网站
  • jsp网站开发制作网站怎么做收费
  • 怎么在另外一台电脑的浏览器打开自己做的网站地址栏输入什么兰州市解封最新消息
  • 广东深圳网站建设服务网站漏洞解决办法
  • 邢台网站建设邢台个人网站快速备案
  • 网站建设仟金手指专业12建设网站大全
  • 高端交互式网站建设青海城乡建设网站