SpringBatch主要是一个轻量级的大数据量的并行处理(批处理)的框架。

作用和Hadoop很相似,不过Hadoop是基于重量级的分布式环境(处理巨量数据),而SpringBatch是基于轻量的应用框架(处理中小数据)。

这里使用SpringBatch做了一个能跑的最简单例子,进行描述SpringBatch的基本作用。

如果需要进行深入学习,请详细参考阅读 https://docs.spring.io/spring-batch/4.0.x/reference/html/index.html ;英文不好的同学,请和我一样右键(翻译成中文查看)。

简单的技术栈 : SpringBoot + SpringBatch + JPA , 完整demo的项目地址 : https://github.com/EalenXie/springboot-batch

1 . 新建项目springboot-batch,基本的pom.xml依赖 :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>name.ealen</groupId>
<artifactId>springboot-batch</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>

2 . 你需要在数据库中建立springbatch的相关元数据表,所以你需要在数据库中执行如下来自官方元数据模式的脚本。

-- do not edit this file
-- BATCH JOB 实例表 包含与aJobInstance相关的所有信息
-- JOB ID由batch_job_seq分配
-- JOB 名称,与spring配置一致
-- JOB KEY 对job参数的MD5编码,正因为有这个字段的存在,同一个job如果第一次运行成功,第二次再运行会抛出JobInstanceAlreadyCompleteException异常。
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
-- 该BATCH_JOB_EXECUTION表包含与该JobExecution对象相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
-- 该表包含与该JobParameters对象相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
-- 该表包含与该StepExecution 对象相关的所有信息
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
-- 该BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext与Step相关的所有信息
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
-- 该表包含ExecutionContext与Job相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

3 . 测试数据的实体类 : Access.java

package name.ealen.model;

import javax.persistence.*;
/**
* Created by EalenXie on 2018/9/10 16:17.
*/
@Entity
@Table
public class Access {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Integer id;
private String username;
private String shopName;
private String categoryName;
private String brandName;
private String shopId;
private String omit;
private String updateTime;
private boolean deleteStatus;
private String createTime;
private String description; public Integer getId() {
return id;
} public void setId(Integer id) {
this.id = id;
} public String getUsername() {
return username;
} public void setUsername(String username) {
this.username = username;
} public String getShopName() {
return shopName;
} public void setShopName(String shopName) {
this.shopName = shopName;
} public String getCategoryName() {
return categoryName;
} public void setCategoryName(String categoryName) {
this.categoryName = categoryName;
} public String getBrandName() {
return brandName;
} public void setBrandName(String brandName) {
this.brandName = brandName;
} public String getShopId() {
return shopId;
} public void setShopId(String shopId) {
this.shopId = shopId;
} public String getOmit() {
return omit;
} public void setOmit(String omit) {
this.omit = omit;
} public String getUpdateTime() {
return updateTime;
} public void setUpdateTime(String updateTime) {
this.updateTime = updateTime;
} public boolean isDeleteStatus() {
return deleteStatus;
} public void setDeleteStatus(boolean deleteStatus) {
this.deleteStatus = deleteStatus;
} public String getCreateTime() {
return createTime;
} public void setCreateTime(String createTime) {
this.createTime = createTime;
} public String getDescription() {
return description;
} public void setDescription(String description) {
this.description = description;
} @Override
public String toString() {
return "Access{" +
"id=" + id +
", username='" + username + '\'' +
", shopName='" + shopName + '\'' +
", categoryName='" + categoryName + '\'' +
", brandName='" + brandName + '\'' +
", shopId='" + shopId + '\'' +
", omit='" + omit + '\'' +
", updateTime='" + updateTime + '\'' +
", deleteStatus=" + deleteStatus +
", createTime='" + createTime + '\'' +
", description='" + description + '\'' +
'}';
}
}

4 . 配置一个最简单的Job 之前,准备一些基本配置,例如为Job添加一个监听器 :

  配置TaskExecutor,ExecutorConfiguration.java

package name.ealen.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /**
* 配置TaskExecutor
*/
@Configuration
public class ExecutorConfiguration { @Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(200);
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setThreadNamePrefix("Data-Job");
return threadPoolTaskExecutor;
}
}

  为Job准备一个简单的监听器 ,实现JobExecutionListener即可 :

package name.ealen.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import javax.annotation.Resource; /**
* Created by EalenXie on 2018/9/10 15:09.
* 一个简单的JOB listener
*/
@Component
public class JobListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobListener.class); @Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor; private long startTime; @Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
log.info("job before " + jobExecution.getJobParameters());
} @Override
public void afterJob(JobExecution jobExecution) {
log.info("JOB STATUS : {}", jobExecution.getStatus());
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("JOB FINISHED");
threadPoolTaskExecutor.destroy();
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
log.info("JOB FAILED");
}
log.info("Job Cost Time : {}ms" , (System.currentTimeMillis() - startTime));
}
}

5 . 配置一个最基本的Job : 一个Job 通常由一个或多个Step组成(基本就像是一个工作流);一个Step通常由三部分组成(读入数据 ItemReader,处理数据 ItemProcessor,写入数据 ItemWriter)

package name.ealen.batch;

import name.ealen.listener.JobListener;
import name.ealen.model.Access;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import javax.annotation.Resource;
import javax.persistence.EntityManagerFactory; /**
* Created by EalenXie on 2018/9/10 14:50.
* :@EnableBatchProcessing提供用于构建批处理作业的基本配置
*/
@Configuration
@EnableBatchProcessing
public class DataBatchConfiguration {
private static final Logger log = LoggerFactory.getLogger(DataBatchConfiguration.class); @Resource
private JobBuilderFactory jobBuilderFactory; //用于构建JOB @Resource
private StepBuilderFactory stepBuilderFactory; //用于构建Step @Resource
private EntityManagerFactory emf; //注入实例化Factory 访问数据 @Resource
private JobListener jobListener; //简单的JOB listener /**
* 一个简单基础的Job通常由一个或者多个Step组成
*/
@Bean
public Job dataHandleJob() {
return jobBuilderFactory.get("dataHandleJob").
incrementer(new RunIdIncrementer()).
start(handleDataStep()). //start是JOB执行的第一个step
// next(xxxStep()).
// next(xxxStep()).
// ...
listener(jobListener). //设置了一个简单JobListener
build();
} /**
* 一个简单基础的Step主要分为三个部分
* ItemReader : 用于读取数据
* ItemProcessor : 用于处理数据
* ItemWriter : 用于写数据
*/
@Bean
public Step handleDataStep() {
return stepBuilderFactory.get("getData").
<Access, Access>chunk(100). // <输入,输出> 。chunk通俗的讲类似于SQL的commit; 这里表示处理(processor)100条后写入(writer)一次。
faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class). //捕捉到异常就重试,重试100次还是异常,JOB就停止并标志失败
reader(getDataReader()). //指定ItemReader
processor(getDataProcessor()). //指定ItemProcessor
writer(getDataWriter()). //指定ItemWriter
build();
} @Bean
public ItemReader<? extends Access> getDataReader() {
//读取数据,这里可以用JPA,JDBC,JMS 等方式 读入数据
JpaPagingItemReader<Access> reader = new JpaPagingItemReader<>();
//这里选择JPA方式读数据 一个简单的 native SQL
String sqlQuery = "SELECT * FROM access";
try {
JpaNativeQueryProvider<Access> queryProvider = new JpaNativeQueryProvider<>();
queryProvider.setSqlQuery(sqlQuery);
queryProvider.setEntityClass(Access.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(emf);
reader.setPageSize(3);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
//所有ItemReader和ItemWriter实现都会在ExecutionContext提交之前将其当前状态存储在其中,如果不希望这样做,可以设置setSaveState(false)
reader.setSaveState(true);
} catch (Exception e) {
e.printStackTrace();
}
return reader;
} @Bean
public ItemProcessor<Access, Access> getDataProcessor() {
return new ItemProcessor<Access, Access>() {
@Override
public Access process(Access access) throws Exception {
log.info("processor data : " + access.toString()); //模拟 假装处理数据,这里处理就是打印一下
return access;
}
};
// lambda也可以写为:
// return access -> {
// log.info("processor data : " + access.toString());
// return access;
// };
} @Bean
public ItemWriter<Access> getDataWriter() {
return list -> {
for (Access access : list) {
log.info("write data : " + access); //模拟 假装写数据 ,这里写真正写入数据的逻辑
}
};
}
}

6 . 配置好基本的Job之后,为Access表导入一些基本的数据(git上面有demo数据,access.sql),写一个SpringBoot的启动类进行测试。

  注意 : Job中的各个组件请使用@Bean注解声明,这样在元数据中才会有相应的正常操作记录 :

package name.ealen;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; /**
* Created by EalenXie on 2018/9/10 14:41.
*/
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}

7 . 运行可以看到基本数据处理效果,这里是模拟处理,和模拟写入 :

8 . 从元数据等表中查看验证JOB的执行情况 :

  

这里提一下,之前写过一篇SpringBoot+Quartz的整合, 大家应该想到些什么了吧。SpringBatch像是一个天然的Job,Quartz是完全可以做为它运作的调度器。两者结合,效果很不错。

感谢各位提出意见和支持。

最新文章

  1. expr命令的一些用法
  2. 记一次Url重写_发布之后iis 404
  3. App开发流程之字符串处理工具类
  4. codeiginter框架数据库操作
  5. Android 给listview设置分割线与边界的距离
  6. centos7 玩aapt 安卓应用apk解包工具的安装
  7. What&#39;s the use of @ before the path defination
  8. http拦截器interceptors
  9. 读书时间《JavaScript高级程序设计》四:BOM,客户端检测
  10. Postgresql中的explain
  11. Centos6安装和配置etcd3
  12. 第二十单元 计划任务crond服务
  13. mpvue 解析
  14. ASP.NET MVC学习笔记(二)笔记
  15. 20172325 2018-2019-1 《Java程序设计》第二周学习总结
  16. JavaScript 日期多加一天 方法
  17. 抽象类、final关键字、多态
  18. 安装Jenkins服务
  19. iOS开发技巧 - 使用UISlider来调整值的范围
  20. lnmp之阿里云源码安装mysql5.7.17

热门文章

  1. HDU1164
  2. [Luogu 2604] ZJOI2010 网络扩容
  3. python+selenium+js 处理滚动条
  4. static变量与context泄漏
  5. [HDU5214]Movie解题报告|小水题大智慧
  6. Qualcomm platform, the commonly used parameters of charger and battery in device tree file
  7. javascript 常用DOM操作整理
  8. [LabVIEW架构]ActorFramework(二)
  9. [hadoop][基本原理]zookeeper场景使用
  10. C# 笔记——数据类型