00、SpringBatch批处理
一、介绍
1、什么是批处理?
定义: 将数据分批次进行处理的过程。
例如: 银行对账文件处理、跨系统数据同步等。
常规批处理步骤: 系统A从数据库中导出数据到文件,系统B读取文件数据并写入到数据库。
批处理特点:
- 自动执行: 根据系统设定的工作步骤自动完成
- 数据量大: 少则百万,多则上千万甚至上亿。如果数据量更大则需要只能用大数据了。
- 定时执行: 例如,每天、每周、每月执行。
2、官网
官网地址: Overview :: Spring Batch
根据官网总结如下:
Sping Batch
是一个轻量级、完善的批处理框架,帮助企业建立健壮、高效的批处理应用。Sping Batch
是一个基于Spring
框架为基础的开发的框架,是Spring
的一个子项目。Sping Batch
提供大量可重用的组件。例如:日志,追踪,事务,任务作业统计,任务重启,跳过,重复,资源管理等等。Spring Batch
是一个 批处理应用框架,不提供调度框架 。如果需要定时处理需要额外引入 调度框架 。例如:Quartz
。
3、优势
Spring Batch 框架通过提供丰富的 开箱即用的组件 和 高可靠性 、高扩展性 的能力。使得开发批处理应用的人员专注于业务处理,提高处理应用的开发能力。
丰富的开箱即用组件
面向Chunk的处理
事务管理能力
元数据管理
易监控的批处理应用
丰富的流程定义
健壮的批处理应用
易扩展的批处理应用
复用企业现有的IT代码
4、组织架构
Application: 应用层。包含所有的批处理作业,程序员自定义代码实现逻辑。
Batch Core: 核心层。包含 Spring Batch
启动 和 控制 所需要的核心类。例如:JobLauncher, Job,Step 等。
Batch Infrastructure: 基础架构层。提供通用的 读 、写 、服务处理。
三层体系使得 Spring Batch
架构可以在不同层面进行扩展,避免影响,实现高内聚低耦合设计。
5、程序运行架构图
JobLauncher: 作业调度器。作业启动的主要入口。
Job: 作业。需要执行的任务逻辑。
Step: 作业步骤。一个 Job 作业由1个或者多个 Step 组成。完成所有 Step 操作,一个完整 Job 才算执行结束。
ItemReader: Step 步骤执行过程中数据输入。可以从数据源(文件系统、数据库、队列等)中读取 Item (数据记录)。
ItemProcessor: Item 数据加工逻辑(输入)。比如:数据清洗、数据转换、数据过滤、数据校验等。
ItemWriter: Step 步骤执行过程中数据输出。将 Item(数据记录) 写入数据源(文件系统、数据库、队列等)。
JobRepository: 保存Job 或者 检索Job 的信息。 Spring Batch 需要 持久化Job(数据库/内存), JobRepository 就是持久化的接口。
二、入门案例-H2版(内存)
需求: 打印一个 hello spring batch! 不带读/写/处理
H2 是一个嵌入式内存数据库
1、新建项目
新建 Spring Boot 项目:springbatch-demo
本次案例使用环境版本:
# Spring Batch 4.x.x通常匹配Spring Boot 2.x.x
jdk 17
Spring Boot 2.6.15
Spring Batch 4.3.8
此步骤省略~
2、引入依赖
<!-- spring batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- H2 -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
3、新建HelloJob.java
package com.ulanhada.springbatchdemo.hello;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import javax.annotation.Resource;
@EnableBatchProcessing
public class HelloJob {
// job调度器
@Resource
private JobLauncher jobLauncher;
// job构造器工厂
@Resource
private JobBuilderFactory jobBuilderFactory;
// step构造器工厂
@Resource
private StepBuilderFactory stepBuilderFactory;
// 任务-step执行逻辑由tasklet完成
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception{
System.out.println("Hello SpringBatch....");
return RepeatStatus.FINISHED;
}
};
}
//作业步骤-不带读/写/处理
@Bean
public Step step1(){
return stepBuilderFactory
.get("step1")
.tasklet(tasklet())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory
.get("helloJob")
.start(step1())
.build();
}
}
三、入门案例-MySQL版
1、引入依赖
<!-- H2 -->
<!-- <dependency>-->
<!-- <groupId>com.h2database</groupId>-->
<!-- <artifactId>h2</artifactId>-->
<!-- <scope>runtime</scope>-->
<!-- </dependency>-->
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
2、修改 application.yml
大家根据实际情况修改~
spring:
application:
name: springbatch-demo
datasource:
username: root
password: 123456
url: jdbc:mysql://192.168.250.128:3306/springbatch?serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.cj.jdbc.Driver
# 初始化数据库,文件在依赖jar包中
sql:
init:
schema-locations: classpath:org/springframework/batch/core/schema-mysql.sql
mode: always
# mode: never
⚠️注意:
sql.init.model
: 首次启动为 always , 后面启动需要改为 never ,否则每次执行SQL都会异常。第一次启动会自动执行指定的脚本,后续不需要再初始化。
3、验证
四、案例解析
1、@EnableBatchProcessing
批处理启动注解,要求添加在 配置类 或者 启动类 上。
添加 @EnableBatchProcessing 注解后,SpringBoot 会自动加载JobLauncher,JobBuilderFactory,StepBuilderFactory 类并创建对象交给容器管理。使用时,直接 @Resource 即可。
2、配置数据库四要素
批处理允许重复执行、异常重试,此时需要保存 批处理状态 与 数据 。Spring Batch 将数据缓存在 H2 内存中或者缓存在指定 数据库 中。入门案例是保存在 MySQL 中,所以需要配置 数据库四要素 。
2、创建Tasklet对象
Tasklet
负责批处理 Step
步骤中具体业务执行。
Tasklet
是一个接口,有且只有一个 execute
方法 ,用于定制 Step
执行逻辑。
execute() 方法返回值是一个状态枚举类:RepeatStatus,里面有 可继续执行态 与 已经完成态 。
3、创建Step对象
Job作业 执行实际就是 Step步骤 执行。入门案例选用最简单的 Tasklet 模式,后续再介绍 Chunk块处理 模式。
4、创建Job并执行Job
创建 Job 对象交给容器管理,当 Spring Boot 启动之后,会自动去从容器中加载 Job 对象。并将 Job 对象交给 JobLauncherApplicationRunner 类,再借助 JobLauncher 类实现 Job 执行。
到这里 00、SpringBatch 4.x.x版本:简单入门 就结束了!!!🎉🎉🎉
后续接 01、SpringBatch 4.x.x版本:作业对象 Job 📣📣📣
欢迎小伙伴们学习和指正!!!😊😊😊
祝大家学习和工作一切顺利!!!😎😎😎