阿里指数查询官网入口,大城网站优化,什么是seo营销,广东企业网站建设策划springboot整合springbatch实现批处理 简介项目搭建步骤 简介
项目搭建
参考博客【场景实战】Spring Boot Spring Batch 实现批处理任务#xff0c;保姆级教程
步骤
1.建表 建表sql
CREATE TABLE student (id int NOT NULL AUTO_INCREMENT,name varchar(100) NOT NULL C… springboot整合springbatch实现批处理 简介项目搭建步骤 简介
项目搭建
参考博客【场景实战】Spring Boot Spring Batch 实现批处理任务保姆级教程
步骤
1.建表 建表sql
CREATE TABLE student (id int NOT NULL AUTO_INCREMENT,name varchar(100) NOT NULL COMMENT 姓名,class_name varchar(20) DEFAULT NULL COMMENT 班级名称,china_score varchar(4) DEFAULT NULL COMMENT 语文成绩,math_score varchar(4) DEFAULT NULL COMMENT 数学成绩,english_score varchar(4) DEFAULT NULL COMMENT 英语成绩,sex tinyint(1) NOT NULL COMMENT 性别0-男1-女,birthday date NOT NULL COMMENT 生日,card_id varchar(20) NOT NULL COMMENT 身份证号,phone varchar(20) NOT NULL COMMENT 手机号,PRIMARY KEY (id),UNIQUE KEY card_id (card_id)
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci COMMENT学生表2.pom文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdspringbatch_study/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesparentartifactIdspring-boot-starter-parent/artifactIdgroupIdorg.springframework.boot/groupIdversion2.3.5.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdversion2.3.5.RELEASE/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.33/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.5.3/version/dependency!--swagger页面--dependencygroupIdcom.github.xiaoymin/groupIdartifactIdknife4j-spring-boot-starter/artifactIdversion2.0.0/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-batch/artifactIdversion2.3.5.RELEASE/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdversion2.3.5.RELEASE/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.24/version/dependencydependencygroupIdorg.hibernate.validator/groupIdartifactIdhibernate-validator/artifactIdversion6.2.2.Final/version/dependency/dependencies/project3.启动类
SpringBootApplication
EnableSwagger2
public class BatchService {public static void main(String[] args) {SpringApplication.run(BatchService.class,args);}}4.配置文件
server:port: 8081
spring:application:name: spring-batch-studydatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?serverTimeZoneAsia/ShanghaicharacterEncodingutf-8username: rootpassword: rootbatch:job:enabled: false #需要jobLaucher.run执行initialize-schema: never #第一次没有新建batch内置表时为always创建内置表后设置为never注意spring.batch.initialize-schema第一次运行时写为always运行后会自动生产batch内置表 5.实体类
package com.test.batch.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;import java.util.Date;/*** author 1*/
Data
public class Student {TableId(type IdType.AUTO)private Integer id;/*** 姓名*/private String name;/*** 班级名称*/private String className;/*** 语文成绩*/private String chinaScore;/*** 数学成绩*/private String mathScore;/*** 英语成绩*/private String englishScore;/*** 性别0-男1-女*/private Integer sex;/*** 生日*/JsonFormat(pattern yyyy-MM-dd)private Date birthday;/*** 身份证号*/private String cardId;/*** 手机号*/private String phone;}
6.batch核心配置类
package com.test.batch.config;import com.test.batch.entity.Student;
import com.test.batch.listen.MyBeanValidator;
import com.test.batch.listen.MyJobListener;
import com.test.batch.listen.MyReaderListener;
import com.test.batch.listen.MyWriteListener;
import com.test.batch.processor.MyProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;/*** author 1*/
Configuration
EnableBatchProcessing
Slf4j
public class BatchConfig {/*** JobRepository定义及数据库的操作* param dataSource* param transactionManager* return* throws Exception*/Beanpublic JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{JobRepositoryFactoryBean jobRepositoryFactoryBean new JobRepositoryFactoryBean();jobRepositoryFactoryBean.setDatabaseType(mysql);jobRepositoryFactoryBean.setTransactionManager(transactionManager);jobRepositoryFactoryBean.setDataSource(dataSource);return jobRepositoryFactoryBean.getObject();}/*** JobLauncher:job的启动器绑定相关的Repository* param dataSource* param transactionManager* return* throws Exception*/Beanpublic SimpleJobLauncher myJobLauncher(DataSource dataSource,PlatformTransactionManager transactionManager)throws Exception{SimpleJobLauncher jobLauncher new SimpleJobLauncher();jobLauncher.setJobRepository(myJobRepository(dataSource,transactionManager));return jobLauncher;}/*** 定义job* param jobBuilderFactory* param myStep* return*/Beanpublic Job myJob(JobBuilderFactory jobBuilderFactory, Step myStep){return jobBuilderFactory.get(myJob).incrementer(new RunIdIncrementer()).flow(myStep).end().listener(myJobListener()).build();}/*** 注册job监听器* return*/Beanpublic MyJobListener myJobListener(){return new MyJobListener();}/*** 定义itemReader读取文件数据entity实体映射* return*/Beanpublic ItemReaderStudent reader(){FlatFileItemReaderStudent reader new FlatFileItemReader();//设置文件路径reader.setResource(new ClassPathResource(static/student.csv));reader.setLineMapper(new DefaultLineMapperStudent(){{setLineTokenizer(new DelimitedLineTokenizer(){{setNames(new String[]{name,className,chinaScore,mathScore,englishScore,sex,birthday,cardIdd,phone});}});setFieldSetMapper(new BeanWrapperFieldSetMapperStudent(){{setTargetType(Student.class);//设置日期转换setConversionService(createConversionService());}});}});return reader;}public ConversionService createConversionService() {DefaultConversionService conversionService new DefaultConversionService();DefaultConversionService.addDefaultConverters(conversionService);conversionService.addConverter(new ConverterString, Date() {Overridepublic Date convert(String text) {SimpleDateFormat sdf new SimpleDateFormat(yyyy-mm-dd);Date date new Date();try {date sdf.parse(text);}catch (Exception e){log.error(日期转换异常 {},e);}return date;}});return conversionService;}/*** 注册ItemProcessor处理数据* return*/Beanpublic ItemProcessorStudent,Student processor(){MyProcessor myProcessor new MyProcessor();myProcessor.setValidator(myBeanValidator());return myProcessor;}Beanpublic MyBeanValidator myBeanValidator(){return new MyBeanValidatorStudent();}/*** 定义ItemWriter指定DataSource,设置批量插入sql语句写入数据库* param dataSource* return*/Beanpublic ItemWriterStudent writer(DataSource dataSource){JdbcBatchItemWriterStudent writer new JdbcBatchItemWriter();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());String sql insert into student (name,class_name,china_score,math_score,english_score,sex,birthday,card_id,phone) values (:name,:className,:chinaScore,:mathScore,:englishScore,:sex,:birthday,:cardId,:phone);writer.setSql(sql);writer.setDataSource(dataSource);return writer;}Beanpublic Step myStep(StepBuilderFactory factory,ItemReaderStudent reader,ItemWriterStudent writer,ItemProcessorStudent,Student processor){return factory.get(myStep).Student,Studentchunk(5000).reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2).listener(new MyReaderListener()).processor(processor).writer(writer).faultTolerant().skip(Exception.class).skipLimit(2).listener(new MyWriteListener()).build();}
}
7.自定义处理器
package com.test.batch.processor;import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;/*** author 1*/
Slf4j
public class MyProcessor extends ValidatingItemProcessorStudent {private Integer GOOD 90;private Integer BAD 60;Overridepublic Student process(Student item) throws ValidationException {/*** 需要执行super.process(item)才会调用自定义校验器*/super.process(item);String chinaScore item.getChinaScore();String mathScore item.getMathScore();String englishScore item.getEnglishScore();String name item.getName();String phone item.getPhone();if (GOOD Double.parseDouble(chinaScore) GOOD Double.parseDouble(mathScore) GOOD Double.parseDouble(englishScore)){log.info({}同学三科成绩均为90以上应该给予奖励, name);}if (BAD Double.parseDouble(chinaScore) BAD Double.parseDouble(mathScore) BAD Double.parseDouble(englishScore)){log.info({}同学三科成绩均不及格建议通知家长电话{}, name,phone);}return item;}
}
8.job监听器
package com.test.batch.listen;import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;/*** author 1*/
Slf4j
public class MyJobListener implements JobExecutionListener {Overridepublic void beforeJob(JobExecution jobExecution) {log.info(job开始id:{},jobExecution.getJobId());}Overridepublic void afterJob(JobExecution jobExecution) {log.info(id:{},jobExecution.getJobId());}
}
9.读组件监听器
package com.test.batch.listen;import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;import static java.lang.String.format;/*** author 1*/
Slf4j
public class MyReaderListener implements ItemReadListener {Overridepublic void beforeRead() {}Overridepublic void afterRead(Object o) {}Overridepublic void onReadError(Exception e) {log.error(读取数据失败{},e);log.info(item error:format(%s%n, e.getMessage()));}
}
10.写组件监听器
package com.test.batch.listen;import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;import java.util.List;import static java.lang.String.format;/*** author 1*/
Slf4j
public class MyWriteListener implements ItemWriteListenerStudent {Overridepublic void beforeWrite(List? extends Student list) {}Overridepublic void afterWrite(List? extends Student list) {}Overridepublic void onWriteError(Exception e, List? extends Student list) {try {log.info(format(%s%n, e.getMessage()));for (Student message : list) {log.info(format(Failed writing Students : %s, message.toString()));}} catch (Exception ex) {log.error(format error :{},ex);}}
}
11.字段校验
package com.test.batch.listen;import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;/*** author 1*/
public class MyBeanValidatorT implements ValidatorT, InitializingBean {private javax.validation.Validator validator;Overridepublic void validate(T t) throws ValidationException {/*** 使用Validator的validate方法校验数据*/SetConstraintViolationT constraintViolations validator.validate(t);if (constraintViolations.size() 0) {StringBuilder message new StringBuilder();for (ConstraintViolationT constraintViolation : constraintViolations) {message.append(constraintViolation.getMessage() \n);}throw new ValidationException(message.toString());}}Overridepublic void afterPropertiesSet() throws Exception {ValidatorFactory validatorFactory Validation.buildDefaultValidatorFactory();validator validatorFactory.usingContext().getValidator();}
}
12.接口
package com.test.batch.controller;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** author 1*/
RestController
Slf4j
public class TestController {AutowiredSimpleJobLauncher launcher;Autowiredprivate Job job;GetMapping(testJob)public ResponseEntity testJob(){try {//job添加参数确保每个job都唯一JobParameters jobParameters new JobParametersBuilder().addDate(date,new Date()).toJobParameters();launcher.run(job,jobParameters);}catch (Exception e){log.error(job error:{},e);return ResponseEntity.ok(e.getMessage());}return ResponseEntity.ok(操作成功);}
}
13.数据 14.运行后浏览器输入 http://localhost:8081/doc.html 或页面输入localhost:8081/testJob,文件内容成功写入数据库