业务服务:任务调度中心


前言

任务调度中心,常被用于发送任务,发送邮件,在某个时间端执行某个任务…

一、Scheduled的使用

Scheduled是springboot内置的定时任务

1. 快速使用

开启定时任务

@SpringBootApplication
@EnableScheduling	// 开启定时任务
public class TaskApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskApplication.class, args);
    }
}

创建任务服务

@Service
@Slf4j
public class TestTask {

    // 每隔一秒执行
    @Scheduled(fixedRate = 1000)
    public void task(){
      log.info("定时任务执行了");
    }
}

启动服务后就可以看到执行效果了

在这里插入图片描述

2. 参数分析

  • cron:为任务执行表达式
  • zone:为时区
  • fixedDelay:任务执行完毕延迟多久执行下一次任务
  • fixedDelayString:任务执行完毕延迟多久执行下一次任务(字符串,支持spel)
  • fixedRate:任务执行间隔
  • fixedRateString:任务执行间隔(字符串,支持spel)
  • initialDelay:初始化后执行延迟
  • initialDelayString:初始化后执行延迟(字符串,支持spel)
  • timeUnit:时间单位
    在这里插入图片描述

随机时间执行

// 随机执行,每次执行间隔时间为0-10秒
@Scheduled(fixedRateString = "#{new Double(T(Math).random()*10000).intValue()}")
public void task2(){
    log.info("task2 执行了");
}

读取配置文件执行时间

// 配置文件执行
@Scheduled(fixedRateString = "${task.active}")
public void task3(){
    log.info("task3 执行了");
}

二、xxl-job的使用

Scheduled无法对任务进行管理只适合简单的应用。所以一般开发中我们使用xxl-job的比较多

访问官网

1. 创建数据库

#
# XXL-JOB v2.3.0
# Copyright (c) 2015-present, xuxueli.

SET NAMES utf8mb4;

CREATE TABLE `xxl_job_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
  `job_desc` varchar(255) NOT NULL,
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `author` varchar(64) DEFAULT NULL COMMENT '作者',
  `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
  `schedule_type` varchar(50) NOT NULL DEFAULT 'NONE' COMMENT '调度类型',
  `schedule_conf` varchar(128) DEFAULT NULL COMMENT '调度配置,值含义取决于调度类型',
  `misfire_strategy` varchar(50) NOT NULL DEFAULT 'DO_NOTHING' COMMENT '调度过期策略',
  `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
  `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
  `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
  `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
  `glue_source` mediumtext COMMENT 'GLUE源代码',
  `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
  `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
  `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
  `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
  `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
  `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
  `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
  `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
  `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
  `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
  `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
  `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
  `trigger_code` int(11) NOT NULL COMMENT '调度-结果',
  `trigger_msg` text COMMENT '调度-日志',
  `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
  `handle_code` int(11) NOT NULL COMMENT '执行-状态',
  `handle_msg` text COMMENT '执行-日志',
  `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、1-无需告警、2-告警成功、3-告警失败',
  PRIMARY KEY (`id`),
  KEY `I_trigger_time` (`trigger_time`),
  KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_log_report` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
  `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
  `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
  `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_logglue` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
  `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
  `glue_source` mediumtext COMMENT 'GLUE源代码',
  `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
  `add_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_registry` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `registry_group` varchar(50) NOT NULL,
  `registry_key` varchar(255) NOT NULL,
  `registry_value` varchar(255) NOT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
  `title` varchar(12) NOT NULL COMMENT '执行器名称',
  `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
  `address_list` text COMMENT '执行器地址列表,多地址逗号分隔',
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(50) NOT NULL COMMENT '账号',
  `password` varchar(50) NOT NULL COMMENT '密码',
  `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
  `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
  PRIMARY KEY (`id`),
  UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `xxl_job_lock` (
  `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
  PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor', '示例执行器', 0, NULL, '2018-11-03 22:21:31' );
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');

commit;

在这里插入图片描述

2. 搭建任务调度应用

由于xxl-job是一个应用所以需要单独引入项目启动。具体操作参考官网,这里就直接省略了

启动后登录后台,账号密码默认为admin/123456

在这里插入图片描述

3. 服务注册到xxl-job

添加依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
</dependency>

配置内容如下

--- # xxl-job 配置
xxl.job:
  # 执行器开关
  enabled: true
  # 调度中心地址:如调度中心集群部署存在多个地址则用逗号分隔。
  admin-addresses: http://localhost:9100/xxl-job-admin
  # 执行器通讯TOKEN:非空时启用
  access-token: xxl-job
  executor:
    # 执行器AppName:执行器心跳注册分组依据;为空则关闭自动注册
    appname: xxl-job-executor
    # 执行器端口号 执行器从9101开始往后写
    port: 9101
    # 执行器注册:默认IP:PORT
    address:
    # 执行器IP:默认自动获取IP
    ip:
    # 执行器运行日志文件存储磁盘路径
    logpath: ./logs/xxl-job
    # 执行器日志文件保存天数:大于3生效
    logretentiondays: 30

创建配置类

@Data
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobProperties {

    private Boolean enabled;

    private String adminAddresses;

    private String accessToken;

    private Executor executor;

    @Data
    @NoArgsConstructor
    public static class Executor {

        private String appname;

        private String address;

        private String ip;

        private int port;

        private String logPath;

        private int logRetentionDays;
    }
}
@Slf4j
@Configuration
@EnableConfigurationProperties(XxlJobProperties.class)
@AllArgsConstructor
@ConditionalOnProperty(prefix = "xxl.job", name = "enabled", havingValue = "true")
public class XxlJobConfig {

    private final XxlJobProperties xxlJobProperties;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(xxlJobProperties.getAdminAddresses());
        xxlJobSpringExecutor.setAccessToken(xxlJobProperties.getAccessToken());
        XxlJobProperties.Executor executor = xxlJobProperties.getExecutor();
        xxlJobSpringExecutor.setAppname(executor.getAppname());
        xxlJobSpringExecutor.setAddress(executor.getAddress());
        xxlJobSpringExecutor.setIp(executor.getIp());
        xxlJobSpringExecutor.setPort(executor.getPort());
        xxlJobSpringExecutor.setLogPath(executor.getLogPath());
        xxlJobSpringExecutor.setLogRetentionDays(executor.getLogRetentionDays());
        return xxlJobSpringExecutor;
    }
}

启动服务

看到这个就说明配置成功了

在这里插入图片描述

4. 快速入门

登录后端在任务管理模块,添加如下任务

在这里插入图片描述

创建任务类

@Service
@Slf4j
public class JobTask {
    
    @XxlJob("demoJobHandler")
    public void task1(){
        log.info("task1 执行了");
    }
}

启动任务

在这里插入图片描述

执行成功

在这里插入图片描述
可以在后台查看执行日志

在这里插入图片描述

5. 任务参数

JobHandler参数

这个参数值一定要和代码的@XxlJob值对应,用来定位执行的方法

在这里插入图片描述

任务参数

在这里插入图片描述

@XxlJob("demoJobHandler")
public void task1(){

    // 获取任务参数
    String jobParam = XxlJobHelper.getJobParam();
    log.info("任务参数:{}",jobParam);
}

在这里插入图片描述

路由策略(一般用于多个实例)

主要用来控制执行器的执行方式

  • 第一个:执行执行器里面的第一个
  • 最后一个:执行执行器里面的最后一个
  • 轮询:依次执行
  • 随机:随机执行
  • 一致性hash:根据执行的hash执行
  • 最不经常使用/最近最久未使用:字面意思
  • 故障转移:如果该执行器挂了,就交给其他执行器
  • 忙碌转移:如果该执行器忙,就交给其他执行器
  • 分片广播:将任务发送给所有执行器
    在这里插入图片描述
    模拟100w数据,发送给执行器并发执行

在这里插入图片描述

   @XxlJob("shardingJobHandler")
    public void task2(){

        // 当前执行器索引
        int shardIndex = XxlJobHelper.getShardIndex();

        // 所有执行器总数
        int shardTotal = XxlJobHelper.getShardTotal();


        for (int i = 0; i < 100*1000; i++){
            if(i%shardTotal==shardIndex){
                log.info("执行器:{},任务参数:{}",shardIndex,i);
            }
        }
    }

在这里插入图片描述
在这里插入图片描述

6. 命令行任务

定期备份sql

编写java代码

/**
 * 3、命令行任务
 */
@XxlJob("commandJobHandler")
public void commandJobHandler() throws Exception {
    String command = XxlJobHelper.getJobParam();
    int exitValue = -1;

    BufferedReader bufferedReader = null;
    try {
        // command process
        ProcessBuilder processBuilder = new ProcessBuilder();
        processBuilder.command(command);
        processBuilder.redirectErrorStream(true);

        Process process = processBuilder.start();
        //Process process = Runtime.getRuntime().exec(command);

        BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
        bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

        // command log
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            XxlJobHelper.log(line);
        }

        // command exit
        process.waitFor();
        exitValue = process.exitValue();
    } catch (Exception e) {
        XxlJobHelper.log(e);
    } finally {
        if (bufferedReader != null) {
            bufferedReader.close();
        }
    }

    if (exitValue == 0) {
        // default success
    } else {
        XxlJobHelper.handleFail("command exit value(" + exitValue + ") is failed");
    }
}

编写bat执行命令(windows,linux类似)

set "datestr=%date:/=-%"
mysqldump -u [数据库账号] -p[数据库密码] --databases [数据库] > "C:\Users\27561\Desktop\index-%datestr%.sql"

添加任务策略
在这里插入图片描述
执行成功后可以看到sql就备份成功了
在这里插入图片描述

7. 跨平台远程调用

编写java代码

/**
 * 4、跨平台Http任务
 * 参数示例:
 * "url: http://www.baidu.com\n" +
 * "method: get\n" +
 * "data: content\n";
 */
@XxlJob("httpJobHandler")
public void httpJobHandler() throws Exception {

    // param parse
    String param = XxlJobHelper.getJobParam();
    if (param == null || param.trim().length() == 0) {
        XxlJobHelper.log("param[" + param + "] invalid.");

        XxlJobHelper.handleFail();
        return;
    }

    String[] httpParams = param.split("\n");
    String url = null;
    String method = null;
    String data = null;
    for (String httpParam : httpParams) {
        if (httpParam.startsWith("url:")) {
            url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
        }
        if (httpParam.startsWith("method:")) {
            method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
        }
        if (httpParam.startsWith("data:")) {
            data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
        }
    }

    // param valid
    if (url == null || url.trim().length() == 0) {
        XxlJobHelper.log("url[" + url + "] invalid.");

        XxlJobHelper.handleFail();
        return;
    }
    if (method == null || !Arrays.asList("GET", "POST").contains(method)) {
        XxlJobHelper.log("method[" + method + "] invalid.");

        XxlJobHelper.handleFail();
        return;
    }
    boolean isPostMethod = method.equals("POST");

    // request
    HttpURLConnection connection = null;
    BufferedReader bufferedReader = null;
    try {
        // connection
        URL realUrl = new URL(url);
        connection = (HttpURLConnection) realUrl.openConnection();

        // connection setting
        connection.setRequestMethod(method);
        connection.setDoOutput(isPostMethod);
        connection.setDoInput(true);
        connection.setUseCaches(false);
        connection.setReadTimeout(5 * 1000);
        connection.setConnectTimeout(3 * 1000);
        connection.setRequestProperty("connection", "Keep-Alive");
        connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
        connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

        // do connection
        connection.connect();

        // data
        if (isPostMethod && data != null && data.trim().length() > 0) {
            DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
            dataOutputStream.write(data.getBytes("UTF-8"));
            dataOutputStream.flush();
            dataOutputStream.close();
        }

        // valid StatusCode
        int statusCode = connection.getResponseCode();
        if (statusCode != 200) {
            throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
        }

        // result
        bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
        StringBuilder result = new StringBuilder();
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            result.append(line);
        }
        String responseMsg = result.toString();

        XxlJobHelper.log(responseMsg);

        return;
    } catch (Exception e) {
        XxlJobHelper.log(e);

        XxlJobHelper.handleFail();
        return;
    } finally {
        try {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            if (connection != null) {
                connection.disconnect();
            }
        } catch (Exception e2) {
            XxlJobHelper.log(e2);
        }
    }

}

新建任务策略
在这里插入图片描述
执行成功后可以在日志中查询执行结果
在这里插入图片描述
在这里插入图片描述

8. 初始化与销毁

  • init会在第一次执行任务的时候调用
  • destroy会在任务销毁时候调用
    /**
     * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
     */
    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
    public void demoJobHandler2() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");
    }

    public void init() {
        log.info("init");
    }

    public void destroy() {
        log.info("destory");
    }

9. 邮件报警

xxl-job中配置邮箱发送信息

--- # 邮件配置
spring:
  mail:
    from: xxxxx	// 需要替换为自己的
    host: smtp.qq.com
    username: xxxxx // 需要替换为自己的
    password: xxxxx // 需要替换为自己的
    port: 25
    properties:
      mail:
        smtp:
          auth: true
          socketFactory:
            class: javax.net.ssl.SSLSocketFactory
          starttls:
            enable: true
            required: true

编写java代码

/**
 * 错误任务
 * @throws Exception
 */
@XxlJob(value = "errorHandler")
public void errorHandler() throws Exception {
    int i=1/0;
}

新增任务策略
在这里插入图片描述
执行成功后就可以看到报错信息了
在这里插入图片描述

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-17 01:32:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-17 01:32:01       101 阅读
  3. 在Django里面运行非项目文件

    2024-03-17 01:32:01       82 阅读
  4. Python语言-面向对象

    2024-03-17 01:32:01       91 阅读

热门阅读

  1. linux下自定义显示文件拷贝进度

    2024-03-17 01:32:01       42 阅读
  2. 2024/3/26

    2024/3/26

    2024-03-17 01:32:01      44 阅读