springboot整合springbatch批处理

springboot整合springbatch实现批处理

简介

springbatch中文官方文档:Spring Batch中文文档

项目搭建

参考博客【场景实战】Spring Boot + Spring Batch 实现批处理任务,保姆级教程

读取文件数据-处理数据-写入数据库

1.建表
建表sql

CREATE TABLE `student` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL COMMENT '姓名',
  `class_name` varchar(20) DEFAULT NULL COMMENT '班级名称',
  `china_score` decimal(4) DEFAULT NULL COMMENT '语文成绩',
  `math_score` decimal(4) DEFAULT NULL COMMENT '数学成绩',
  `english_score` decimal(4) DEFAULT NULL COMMENT '英语成绩',
  `sex` tinyint(1) NOT NULL COMMENT '性别:0-男,1-女',
  `birthday` date NOT NULL COMMENT '生日',
  `card_id` varchar(20) NOT NULL COMMENT '身份证号',
  `phone` varchar(20) NOT NULL COMMENT '手机号',
  PRIMARY KEY (`id`),
  UNIQUE KEY `card_id` (`card_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='学生表'

2.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>springbatch_study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.3.5.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3</version>
        </dependency>

        <!--swagger页面-->
        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>org.hibernate.validator</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>6.2.2.Final</version>
        </dependency>
    </dependencies>

</project>

3.启动类

@SpringBootApplication
@EnableSwagger2
public class BatchService {
   
    public static void main(String[] args) {
   
        SpringApplication.run(BatchService.class,args);
    }

}

4.配置文件

server:
  port: 8081
spring:
  application:
    name: spring-batch-study
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?serverTimeZone=Asia/Shanghai&characterEncoding=utf-8
    username: root
    password: root
  batch:
    job:
      enabled: false #需要jobLaucher.run执行
    initialize-schema: never #第一次没有新建batch内置表时为always,创建内置表后设置为never

注意:spring.batch.initialize-schema第一次运行时写为always,运行后会自动生产batch内置表
5.实体类

package com.test.batch.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;

import java.util.Date;

/**
 * @author 1
 */
@Data
public class Student {
   
    @TableId(type = IdType.AUTO)
    private Integer id;
    /**
     * '姓名'
     */
    private String name;
    /**
     * '班级名称'
     */
    private String className;

    /**
     * '语文成绩'
     */
    private BigDecimal chinaScore;

    /**
     * '数学成绩'
     */
    private BigDecimal mathScore;
    /**
     * 英语成绩
     */
    private BigDecimal englishScore;

    /**
     * '性别:0-男,1-女'
     */
    private Integer sex;

    /**
     * '生日'
     */
    @JsonFormat(pattern = "yyyy-MM-dd")
    private Date birthday;

    /**
     * '身份证号'
     */
    private String cardId;
    /**
     * '手机号'
     */
    private String phone;

}

6.batch核心配置类

package com.test.batch.config;

import com.test.batch.entity.Student;
import com.test.batch.listen.MyBeanValidator;
import com.test.batch.listen.MyJobListener;
import com.test.batch.listen.MyReaderListener;
import com.test.batch.listen.MyWriteListener;
import com.test.batch.processor.MyProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author 1
 */
@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {
   

    /**
     * JobRepository定义及数据库的操作
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{
   
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDataSource(dataSource);
        return jobRepositoryFactoryBean.getObject();
    }

    /**
     * JobLauncher:job的启动器,绑定相关的Repository
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public SimpleJobLauncher myJobLauncher(DataSource dataSource,PlatformTransactionManager transactionManager)throws Exception{
   
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(myJobRepository(dataSource,transactionManager));
        return jobLauncher;
    }

    /**
     * 定义job
     * @param jobBuilderFactory
     * @param myStep
     * @return
     */
    @Bean
    public Job myJob(JobBuilderFactory jobBuilderFactory, Step myStep){
   
        return jobBuilderFactory.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(myStep)
                .end()
                .listener(myJobListener())
                .build();
    }

    /**
     * 注册job监听器
     * @return
     */
    @Bean
    public MyJobListener myJobListener(){
   
        return new MyJobListener();
    }

    /**
     * 定义itemReader,读取文件数据+entity实体映射
     * @return
     */
    @Bean
    public ItemReader<Student> reader(){
   
        FlatFileItemReader<Student> reader = new FlatFileItemReader<>();
        //设置文件路径
        reader.setResource(new ClassPathResource("static/student.csv"));
        reader.setLineMapper(new DefaultLineMapper<Student>(){
   
            {
   
                setLineTokenizer(new DelimitedLineTokenizer(){
   
                    {
   
                        setNames(new String[]{
   "name","className","chinaScore","mathScore","englishScore","sex","birthday","cardIdd","phone"});
                    }
                });

                setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>(){
   
                    {
   
                        setTargetType(Student.class);
                        //设置日期转换
                        setConversionService(createConversionService());
                    }
                });
            }
        });
        return reader;

    }

    public ConversionService createConversionService() {
   
        DefaultConversionService conversionService = new DefaultConversionService();
        DefaultConversionService.addDefaultConverters(conversionService);
        conversionService.addConverter(new Converter<String, Date>() {
   
            @Override
            public Date convert(String text) {
   
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
                Date date = new Date();
                try {
   
                    date = sdf.parse(text);
                }catch (Exception e){
   
                    log.error("日期转换异常 :{}",e);
                }

                return date;
            }
        });
        return conversionService;
    }

    /**
     * 注册ItemProcessor,处理数据
     * @return
     */
    @Bean
    public ItemProcessor<Student,Student> processor(){
   
        MyProcessor myProcessor = new MyProcessor();
        myProcessor.setValidator(myBeanValidator());
        return myProcessor;
    }

    @Bean
    public MyBeanValidator myBeanValidator(){
   
        return new MyBeanValidator<Student>();
    }

    /**
     * 定义ItemWriter,指定DataSource,设置批量插入sql语句,写入数据库
     * @param dataSource
     * @return
     */
    @Bean
    public ItemWriter<Student> writer(DataSource dataSource){
   
        JdbcBatchItemWriter<Student> writer = new JdbcBatchItemWriter<>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        String sql = "insert into student (name,class_name,china_score,math_score,english_score,sex,birthday,card_id,phone) " +
                "values (:name,:className,:chinaScore,:mathScore,:englishScore,:sex,:birthday,:cardId,:phone)";
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public Step myStep(StepBuilderFactory factory,ItemReader<Student> reader,ItemWriter<Student> writer,ItemProcessor<Student,Student> processor){
   
        return factory.get("myStep")
                .<Student,Student>chunk(5000)
                .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
                .listener(new MyReaderListener())
                .processor(processor)
                .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
                .listener(new MyWriteListener())
                .build();
    }
}

7.自定义处理器

package com.test.batch.processor;

import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;


/**
 * @author 1
 */
@Slf4j
public class MyProcessor extends ValidatingItemProcessor<Student> {
   

    private Integer GOOD = 90;

    private Integer  BAD = 60;

    @Override
    public Student process(Student item) throws ValidationException {
   
        /**
         * 需要执行super.process(item)才会调用自定义校验器
         */
        super.process(item);

        BigDecimal chinaScore = item.getChinaScore();
        BigDecimal mathScore = item.getMathScore();
        BigDecimal englishScore = item.getEnglishScore();
        String name = item.getName();
        String phone = item.getPhone();
        if (GOOD <= chinaScore.intValue() && GOOD <= mathScore.intValue() && GOOD <= englishScore.intValue()){
   
            log.info("{}同学三科成绩均为90以上,应该给予奖励", name);
        }
        if (BAD > chinaScore.intValue() && BAD > mathScore.intValue() && BAD > englishScore.intValue()){
   
            log.info("{}同学三科成绩均不及格,建议通知家长,电话:{}", name,phone);
        }
        return item;
    }
}

8.job监听器

package com.test.batch.listen;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

/**
 * @author 1
 */
@Slf4j
public class MyJobListener implements JobExecutionListener {
   

    @Override
    public void beforeJob(JobExecution jobExecution) {
   
        log.info("job开始,id:{}",jobExecution.getJobId());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
   
        log.info("id:{}",jobExecution.getJobId());
    }
}

9.读组件监听器

package com.test.batch.listen;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;

import static java.lang.String.format;

/**
 * @author 1
 */
@Slf4j
public class MyReaderListener implements ItemReadListener {
   
    @Override
    public void beforeRead() {
   

    }

    @Override
    public void afterRead(Object o) {
   

    }

    @Override
    public void onReadError(Exception e) {
   
        log.error("读取数据失败:{}",e);
        log.info("item error:"+format("%s%n", e.getMessage()));
    }
}

10.写组件监听器

package com.test.batch.listen;

import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;

import java.util.List;

import static java.lang.String.format;

/**
 * @author 1
 */
@Slf4j
public class MyWriteListener implements ItemWriteListener<Student> {
   

    @Override
    public void beforeWrite(List<? extends Student> list) {
   

    }

    @Override
    public void afterWrite(List<? extends Student> list) {
   

    }

    @Override
    public void onWriteError(Exception e, List<? extends Student> list) {
   
        try {
   
            log.info(format("%s%n", e.getMessage()));
            for (Student message : list) {
   
                log.info(format("Failed writing Students : %s", message.toString()));
            }

        } catch (Exception ex) {
   
            log.error("format error :{}",ex);
        }
    }
}

11.字段校验

package com.test.batch.listen;

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;

import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;

/**
 * @author 1
 */
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {
   

    private javax.validation.Validator validator;

    @Override
    public void validate(T t) throws ValidationException {
   
        /**
         * 使用Validator的validate方法校验数据
         */
        Set<ConstraintViolation<T>> constraintViolations =
                validator.validate(t);
        if (constraintViolations.size() > 0) {
   
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
   
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
   
        ValidatorFactory validatorFactory =
                Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }
}

12.接口

package com.test.batch.controller;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * @author 1
 */
@RestController
@Slf4j
public class TestController {
   

    @Autowired
    SimpleJobLauncher launcher;

    @Autowired
    private Job job;

    @GetMapping("testJob")
    public ResponseEntity testJob(){
   
        try {
   
            //job添加参数,确保每个job都唯一
            JobParameters jobParameters = new JobParametersBuilder().addDate("date",new Date()).toJobParameters();
            launcher.run(job,jobParameters);
        }catch (Exception e){
   
            log.error("job error:{}",e);
            return ResponseEntity.ok(e.getMessage());
        }
        return ResponseEntity.ok("操作成功!!!");
    }
}

13.数据
在这里插入图片描述
14.运行后浏览器输入
http://localhost:8081/doc.html
或页面输入localhost:8081/testJob,文件内容成功写入数据库
控制台输出
在这里插入图片描述

步骤

业务需求:从文件读取数据,金国处理后写入数据库
源码:springbatch案例

相关推荐

  1. SpringBoot整理-错误处理

    2024-01-08 03:46:03       31 阅读
  2. Springboot整合hibernate validator 全局异常处理

    2024-01-08 03:46:03       31 阅读
  3. Redis-处理

    2024-01-08 03:46:03       15 阅读
  4. 处理算法

    2024-01-08 03:46:03       12 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-08 03:46:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-08 03:46:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-08 03:46:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-08 03:46:03       20 阅读

热门阅读

  1. SpringBoot 中实现订单30分钟自动取消的策略

    2024-01-08 03:46:03       38 阅读
  2. Docker - 启动 MySQL 闪退解决方案

    2024-01-08 03:46:03       33 阅读
  3. 实现一个网页聊天室

    2024-01-08 03:46:03       31 阅读
  4. 讲解eureca和nacus的区别

    2024-01-08 03:46:03       33 阅读
  5. Kvaser使用(Can总线)

    2024-01-08 03:46:03       153 阅读
  6. 数据库连接使用问题 - 1

    2024-01-08 03:46:03       33 阅读
  7. Docker学习笔记(一):Docker命令总结

    2024-01-08 03:46:03       39 阅读
  8. 学习录

    学习录

    2024-01-08 03:46:03      37 阅读
  9. 【深度学习在时序数据异常检测中的创新】

    2024-01-08 03:46:03       30 阅读
  10. tyxsspa/AnyText 阿里生成文字

    2024-01-08 03:46:03       35 阅读