背景
在一次需求中, 需要每天定时提示信息到企业微信中. 查看任务的运行进展, 所以准备在企微群聊中添加机器人, 使用xxl-job配置的定时任务. 原本计划是每天一次. 但配置成了每秒一次.
等到触发时, 发现时间不对. 这时做了几个动作(不分先后):
- 停止定时任务
- 编辑定时任务到正确的执行间隔时间
- 移除现有的企微通知机器人
- 等做完这些后, 我新建了一个企业微信机器人 重新加入到群聊. 然后开启任务.
结果机器人还是按照1秒1条消息发送.
这时我突然想到了 job的一点工作原理~ 是否是消息积压了
那么 什么是xxl-job
什么是 xxl-job, 这个就不用过多解释. 我们直接看架构图.
xxl-job 核心是两部分组成:
1.调度中心
调度中心有个核心的组件为: 时间轮. 比如把每分钟60秒 每秒是一个槽点 组成一个圆环. 后续重点讲一讲.
2.执行器
执行器里面有一个队列来存储 调度的请求, 然后启用 jobhander(任务) 执行, 这里有个重点的源代码为:
com.xxl.job.core.executor.impl.XxlJobSpringExecutor
这也是他们的设计思想:
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
3. 从使用的项目中的执行器开始了解
项目首先需要添加依赖:
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
这个依赖的重点之一, 就是 XxlJobSpringExecutor, 这个类实现了SmartInitializingSingleton接口, 所以经过Bean的生命周期,一定会调用afterSingletonsInstantiated这个方法的实现
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
// start
@Override
public void afterSingletonsInstantiated() {
// 实例化化 根据@xxlJob 注解标注的方法实例, 然后以key/value的形式存储在 concurrentMap<String, IjobHander>中
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
// 开始初始化, 日志路径, 客户端,日志的清理,回调线程,以及执行器服务
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
4. JobHandler 也就是定时任务的封装.
在初始化方法中 initJobHandlerMethodRepository(applicationContext); 里面就实例化了jobHandler
public abstract class IJobHandler {
/**
* execute handler, invoked when executor receives a scheduling request
*
* @throws Exception
*/
public abstract void execute() throws Exception;
/**
* init handler, invoked when JobThread init
*/
public void init() throws Exception {
// do something
}
/**
* destroy handler, invoked when JobThread destroy
*/
public void destroy() throws Exception {
// do something
}
// 代码就没有贴全了 registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
// registry jobhandler
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
// 保存job的地方
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
IJobHandler 的继承类有三个 GlueJobHandler, MethodJobHandler, ScriptJobHandler, 当然你也可以自己实现IJobHandler, 然后手动注册到JobHandler的缓存中,也是可以通过调度中心触发的
这里我们就只讲我们用到的 MethodJobHandler; 里面实际比较简单, 就是使用反射调用方法.
@Override
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
5. 灵魂拷问
到此为止, 项目中的配置@xxl-job注解如何初始化,如何调用执行就结束了 那么我们怎么接收来自 调度中心的请求呢?
6. server 服务器
创建一个Http服务器
除了初始化JobHandler之外,执行器还会创建一个Http服务器, com.xxl.job.core.server.EmbedServer
这个服务器端口号就是通过XxlJobSpringExecutor配置的端口,download-admin-server中就是设置的是9527,底层是基于Netty实现的</