好的兼职做调查网站,注册小公司,深圳低价网站建设,做网站如何获取收益文章目录 1、新建pipeline流水线2、定义处理器3、定义处理器上下文4、pipeline流水线实现5、处理器抽象类实现6、pipeline流水线构建者7、具体处理器实现8、流水线测试9、运行结果 1、新建pipeline流水线
package com.summer.toolkit.model.chain;import java.util.List;
impo… 文章目录 1、新建pipeline流水线2、定义处理器3、定义处理器上下文4、pipeline流水线实现5、处理器抽象类实现6、pipeline流水线构建者7、具体处理器实现8、流水线测试9、运行结果 1、新建pipeline流水线
package com.summer.toolkit.model.chain;import java.util.List;
import java.util.concurrent.Executor;public interface PipelineT {/*** 向pipeline中添加一个执行器** param handler 执行器* return 返回pipeline对象*/PipelineT addLast(HandlerT handler);/*** 向pipeline中添加一个执行器** param name 执行器名称* param handler 执行器* return 返回pipeline对象*/PipelineT addLast(String name, HandlerT handler);/*** pipeline执行** param list 数据集合* return 返回值执行完成返回true*/boolean execute(ListT list);/*** pipeline并行执行** param list 数据集合* param executor 线程池* return 返回值执行完成返回true*/boolean parallelExecute(ListT list, Executor executor);/*** pipeline执行** param object 单个数据* return 返回值执行完成返回true*/boolean execute(T object);}
2、定义处理器
package com.summer.toolkit.model.chain;public interface HandlerT {/*** 处理器处理方法** param handlerContext 上下文* param t 要处理的数据*/void doHandler(HandlerContextT handlerContext, T t);}
3、定义处理器上下文
package com.summer.toolkit.model.chain;import lombok.Data;Data
public class HandlerContextT {/*** 执行器名称 */private String name;/*** 执行器 */private HandlerT handler;/*** 链表的下一个节点用来保存下一个执行器 */public HandlerContextT next;public HandlerContext(HandlerT handler) {this.name handler.getClass().getName();this.handler handler;}public HandlerContext(String name, HandlerT handler) {this.name name;this.handler handler;}/*** 调用该方法即调用上下文中处理器的执行方法** param t 需要处理的数据*/public void handle(T t) {this.handler.doHandler(this, t);}/*** 执行下一个节点的处理器** param t 待执行的数据*/public void runNext(T t) {if (this.next ! null) {this.next.handle(t);}}
}
4、pipeline流水线实现
package com.summer.toolkit.model.chain;import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;Slf4j
public class DefaultPipelineT implements PipelineT {/*** 默认pipeline中有一个处理器上下文的头结点* 头结点无处理逻辑直接执行下一个节点的处理器*/HandlerContextT head new HandlerContext(HandlerContext::runNext);Overridepublic PipelineT addLast(HandlerT handler) {this.addLast(null, handler);return this;}Overridepublic PipelineT addLast(String name, HandlerT handler) {if (handler null) {log.warn(处理器为空不进行添加);return this;}if (StringUtils.isEmpty(name)) {name handler.getClass().getName();}// 将处理器添加到处理器上下文的尾节点HandlerContextT context head;while (context.next ! null) {context context.next;}context.next new HandlerContextT(name, handler);return this;}Overridepublic boolean execute(ListT list) {ListObject result list.stream().peek(this::execute).collect(Collectors.toList());return true;}Overridepublic boolean parallelExecute(ListT list, Executor executor) {MapString, ListT parts this.split(list);ListCompletableFutureBoolean results new ArrayList();for (Map.EntryString, ListT entry : parts.entrySet()) {CompletableFutureBoolean completableFuture CompletableFuture// 提交任务.supplyAsync(() - this.execute(entry.getValue()), executor)// 打印异常信息.exceptionally(e - {log.error(并行处理数据时发生异常{}, e.getMessage(), e);return Boolean.FALSE;});results.add(completableFuture);}CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();return true;}Overridepublic boolean execute(T t) {this.head.handle(t);return true;}/*** 对集合进行分组拆分** param list 集合* return 返回值*/private MapString, ListT split(ListT list) {MapString, ListT parts new HashMap(8);if (CollectionUtils.isEmpty(list)) {return parts;}// 如果集合数量过少则不进行分组int limit 10;if (list.size() limit) {String key String.valueOf(0);parts.put(key, list);return parts;}// 固定分五个分组int group 5;for (int i 0, length list.size(); i length; i) {int key i % group;ListT part parts.computeIfAbsent(String.valueOf(key), k - new ArrayList());T t list.get(i);part.add(t);}return parts;}}
5、处理器抽象类实现
package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;Slf4j
public abstract class AbstractHandlerT implements HandlerT {/*** 开始处理数据通用方法** param handlerContext 上下文* param t 要处理的数据*/Overridepublic void doHandler(HandlerContextT handlerContext, T t) {long start System.currentTimeMillis();String threadName Thread.currentThread().getName();String handlerName handlerContext.getName();log.info({} 开始处理{}, threadName, handlerName);try {// 此处处理异常如果执行过程失败则继续执行下一个handlerthis.handle(t);} catch (Throwable throwable) {log.error({} 处理异常{}异常原因{}, threadName, handlerName, throwable.getMessage(), throwable);this.handleException(t, throwable);}long end System.currentTimeMillis();log.info({} 处理完成{}耗时{} 毫秒, threadName, handlerName, (end - start));// 处理完该上下文中的处理器逻辑后调用上下文中的下一个执行器的执行方法handlerContext.runNext(t);}/*** 处理数据抽象方法由子类实现具体细节** param t 对象*/public abstract void handle(T t);/*** 处理数据抽象方法由子类实现具体细节** param t 对象* param throwable 异常对象*/public void handleException(T t, Throwable throwable) {log.error(处理数据发生异常{}, throwable.getMessage(), throwable);}}
6、pipeline流水线构建者
package com.summer.toolkit.model.chain;public class DefaultPipelineBuilderT {private final PipelineT pipeline;public DefaultPipelineBuilder() {this.pipeline new DefaultPipeline();}/*** 向pipeline中添加一个执行器** param handler 执行器* return 返回pipeline对象*/public DefaultPipelineBuilderT addLast(HandlerT handler) {pipeline.addLast(handler);return this;}/*** 向pipeline中添加一个执行器** param name 执行器名称* return 返回pipeline对象*/public DefaultPipelineBuilderT addLast(String name, HandlerT handler) {pipeline.addLast(name, handler);return this;}/*** 返回pipeline对象** return 返回值*/public PipelineT build() {return this.pipeline;}}
7、具体处理器实现
package com.summer.toolkit.model.chain;import lombok.extern.slf4j.Slf4j;import java.util.Objects;Slf4j
public class StringHandler extends AbstractHandlerString {Overridepublic void handle(String s) {log.info(入参{}, s);}Overridepublic void handleException(String s, Throwable throwable) {if (Objects.nonNull(throwable)) {log.error(异常{}, throwable.getMessage());}}}
8、流水线测试
package com.summer.toolkit.model;import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;public class Processor {public static void main(String[] args) {DefaultPipelineBuilderString builder new DefaultPipelineBuilder();PipelineString pipeline builder.addLast(字符串信息, new StringHandler()).addLast(寄件人信息, new StringHandler()).addLast(收件人信息, new StringHandler()).build();pipeline.execute(1);}}
9、运行结果
20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - main 开始处理字符串信息
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - main 处理完成字符串信息耗时5 毫秒
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - main 开始处理寄件人信息
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - main 处理完成寄件人信息耗时0 毫秒
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - main 开始处理收件人信息
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - main 处理完成收件人信息耗时0 毫秒