1、新建pipeline流水线
package com.summer.toolkit.model.chain;
import java.util.List;
import java.util.concurrent.Executor;
public interface Pipeline<T> {
/**
* 向pipeline中添加一个执行器
*
* @param handler 执行器
* @return 返回pipeline对象
*/
Pipeline<T> addLast(Handler<T> handler);
/**
* 向pipeline中添加一个执行器
*
* @param name 执行器名称
* @param handler 执行器
* @return 返回pipeline对象
*/
Pipeline<T> addLast(String name, Handler<T> handler);
/**
* pipeline执行
*
* @param list 数据集合
* @return 返回值,执行完成返回true
*/
boolean execute(List<T> list);
/**
* pipeline并行执行
*
* @param list 数据集合
* @param executor 线程池
* @return 返回值,执行完成返回true
*/
boolean parallelExecute(List<T> list, Executor executor);
/**
* pipeline执行
*
* @param object 单个数据
* @return 返回值,执行完成返回true
*/
boolean execute(T object);
}
2、定义处理器
package com.summer.toolkit.model.chain;
public interface Handler<T> {
/**
* 处理器处理方法
*
* @param handlerContext 上下文
* @param t 要处理的数据
*/
void doHandler(HandlerContext<T> handlerContext, T t);
}
3、定义处理器上下文
package com.summer.toolkit.model.chain;
import lombok.Data;
@Data
public class HandlerContext<T> {
/*** 执行器名称 */
private String name;
/*** 执行器 */
private Handler<T> handler;
/*** 链表的下一个节点,用来保存下一个执行器 */
public HandlerContext<T> next;
public HandlerContext(Handler<T> handler) {
this.name = handler.getClass().getName();
this.handler = handler;
}
public HandlerContext(String name, Handler<T> handler) {
this.name = name;
this.handler = handler;
}
/**
* 调用该方法即调用上下文中处理器的执行方法
*
* @param t 需要处理的数据
*/
public void handle(T t) {
this.handler.doHandler(this, t);
}
/**
* 执行下一个节点的处理器
*
* @param t 待执行的数据
*/
public void runNext(T t) {
if (this.next != null) {
this.next.handle(t);
}
}
}
4、pipeline流水线实现
package com.summer.toolkit.model.chain;
import com.summer.toolkit.util.CollectionUtils;
import com.summer.toolkit.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@Slf4j
public class DefaultPipeline<T> implements Pipeline<T> {
/**
* 默认pipeline中有一个处理器上下文的头结点
* 头结点无处理逻辑,直接执行下一个节点的处理器
*/
HandlerContext<T> head = new HandlerContext<>(HandlerContext::runNext);
@Override
public Pipeline<T> addLast(Handler<T> handler) {
this.addLast(null, handler);
return this;
}
@Override
public Pipeline<T> addLast(String name, Handler<T> handler) {
if (handler == null) {
log.warn("处理器为空,不进行添加!");
return this;
}
if (StringUtils.isEmpty(name)) {
name = handler.getClass().getName();
}
// 将处理器添加到处理器上下文的尾节点
HandlerContext<T> context = head;
while (context.next != null) {
context = context.next;
}
context.next = new HandlerContext<T>(name, handler);
return this;
}
@Override
public boolean execute(List<T> list) {
List<Object> result = list.stream()
.peek(this::execute)
.collect(Collectors.toList());
return true;
}
@Override
public boolean parallelExecute(List<T> list, Executor executor) {
Map<String, List<T>> parts = this.split(list);
List<CompletableFuture<Boolean>> results = new ArrayList<>();
for (Map.Entry<String, List<T>> entry : parts.entrySet()) {
CompletableFuture<Boolean> completableFuture = CompletableFuture
// 提交任务
.supplyAsync(() -> this.execute(entry.getValue()), executor)
// 打印异常信息
.exceptionally(e -> {
log.error("并行处理数据时发生异常!{}", e.getMessage(), e);
return Boolean.FALSE;
});
results.add(completableFuture);
}
CompletableFuture.allOf(results.toArray(new CompletableFuture[0])).join();
return true;
}
@Override
public boolean execute(T t) {
this.head.handle(t);
return true;
}
/**
* 对集合进行分组拆分
*
* @param list 集合
* @return 返回值
*/
private Map<String, List<T>> split(List<T> list) {
Map<String, List<T>> parts = new HashMap<>(8);
if (CollectionUtils.isEmpty(list)) {
return parts;
}
// 如果集合数量过少,则不进行分组
int limit = 10;
if (list.size() < limit) {
String key = String.valueOf(0);
parts.put(key, list);
return parts;
}
// 固定分五个分组
int group = 5;
for (int i = 0, length = list.size(); i < length; i++) {
int key = i % group;
List<T> part = parts.computeIfAbsent(String.valueOf(key), k -> new ArrayList<>());
T t = list.get(i);
part.add(t);
}
return parts;
}
}
5、处理器抽象类实现
package com.summer.toolkit.model.chain;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractHandler<T> implements Handler<T> {
/**
* 开始处理数据,通用方法
*
* @param handlerContext 上下文
* @param t 要处理的数据
*/
@Override
public void doHandler(HandlerContext<T> handlerContext, T t) {
long start = System.currentTimeMillis();
String threadName = Thread.currentThread().getName();
String handlerName = handlerContext.getName();
log.info("====={} 开始处理:{}=====", threadName, handlerName);
try {
// 此处处理异常,如果执行过程失败,则继续执行下一个handler
this.handle(t);
} catch (Throwable throwable) {
log.error("====={} 处理异常:{},异常原因:{}=====", threadName, handlerName, throwable.getMessage(), throwable);
this.handleException(t, throwable);
}
long end = System.currentTimeMillis();
log.info("====={} 处理完成:{},耗时:{} 毫秒=====", threadName, handlerName, (end - start));
// 处理完该上下文中的处理器逻辑后,调用上下文中的下一个执行器的执行方法
handlerContext.runNext(t);
}
/**
* 处理数据抽象方法,由子类实现具体细节
*
* @param t 对象
*/
public abstract void handle(T t);
/**
* 处理数据抽象方法,由子类实现具体细节
*
* @param t 对象
* @param throwable 异常对象
*/
public void handleException(T t, Throwable throwable) {
log.error("=====处理数据发生异常:{}", throwable.getMessage(), throwable);
}
}
6、pipeline流水线构建者
package com.summer.toolkit.model.chain;
public class DefaultPipelineBuilder<T> {
private final Pipeline<T> pipeline;
public DefaultPipelineBuilder() {
this.pipeline = new DefaultPipeline<>();
}
/**
* 向pipeline中添加一个执行器
*
* @param handler 执行器
* @return 返回pipeline对象
*/
public DefaultPipelineBuilder<T> addLast(Handler<T> handler) {
pipeline.addLast(handler);
return this;
}
/**
* 向pipeline中添加一个执行器
*
* @param name 执行器名称
* @return 返回pipeline对象
*/
public DefaultPipelineBuilder<T> addLast(String name, Handler<T> handler) {
pipeline.addLast(name, handler);
return this;
}
/**
* 返回pipeline对象
*
* @return 返回值
*/
public Pipeline<T> build() {
return this.pipeline;
}
}
7、具体处理器实现
package com.summer.toolkit.model.chain;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
@Slf4j
public class StringHandler extends AbstractHandler<String> {
@Override
public void handle(String s) {
log.info("入参:{}", s);
}
@Override
public void handleException(String s, Throwable throwable) {
if (Objects.nonNull(throwable)) {
log.error("异常:{}", throwable.getMessage());
}
}
}
8、流水线测试
package com.summer.toolkit.model;
import com.summer.toolkit.model.chain.DefaultPipelineBuilder;
import com.summer.toolkit.model.chain.Pipeline;
import com.summer.toolkit.model.chain.StringHandler;
public class Processor {
public static void main(String[] args) {
DefaultPipelineBuilder<String> builder = new DefaultPipelineBuilder<>();
Pipeline<String> pipeline = builder
.addLast("字符串信息", new StringHandler())
.addLast("寄件人信息", new StringHandler())
.addLast("收件人信息", new StringHandler())
.build();
pipeline.execute("1");
}
}
9、运行结果
20:03:00.285 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:字符串信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:字符串信息,耗时:5 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:寄件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:寄件人信息,耗时:0 毫秒=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 开始处理:收件人信息=====
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.StringHandler - 入参:1
20:03:00.289 [main] INFO com.summer.toolkit.model.chain.AbstractHandler - =====main 处理完成:收件人信息,耗时:0 毫秒=====