Spring Batch: a Typical Use Case
Want to learn more about using Spring Batch in your Java applications? Check out this tutorial using Spring Batch, Spring Boot, MongoDB, and Java 8.
Join the DZone community and get the full member experience.
Join For FreeIn this article, I would like to show you a typical example of using Spring Batch. I assume that the reader has a basic knowledge about Spring Boot, Spring Batch, MongoDB, and Java 8.
Let's get started.
Batch
What is a batch in Spring? In short, a batch executes jobs. A single job contains at least one step. The case I'm going to describe is about batch, which executes one job. That job contains several independent steps. But first, let me explain why I had to do that.
Some time ago, I had to add a new functionality to my existing system. That system had to get data from an external database. That external database was accessible through a REST API, but we could not use it "on the fly" because it had taken too much time. So, we decided to replicate the data from the external database and on the database that our system had been using. In other words, we had to download what we needed and store it to our database. After that, we could use data "on the fly" without any performance overhead.
After the analyses, the picture of the application was revealed. The application should do the following items:
- Download the data from an external database via the REST API and save them as CSV files.
- Each data set is independent, so steps in jobs also have to be independent (with one exception).
- Create as many MongoDB collections as CSV files.
- Convert each row from a CSV file to a MongoDB document.
- Save documents in relevant collections.
- Remove CSV files afterward.
- Keep the details of the batch, job, and steps execution.
Preconditions
System requirements are as follows:
- Java 8,
- Maven 3.5.2
- Spring Boot 1.5.10
- Spring Web 4.3.13
- MongoDB 3.4.
Here are the dependencies defined in the pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.3.14.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
I decided to create a Spring Boot application with Spring Batch and MongoDB starters. I added a separate dependency for spring-web to have a REST client. Spring-web uses httpcomponents. And, commons-io is used to process CSV files.
Implementation and Explanation
I went through all the documentation regarding Spring Batch that was available on Spring's web page, and I decided that, in my case, I should use the Tasklet approach. There was no need to use the item reader, processor, and writer. In my case, the step should do as follows:
- Collect info about CSV headers
- Download data in the CSV form
- Convert each row into a MongoDB document
- Insert the database in chunks
- Repeat the above steps until everything has been downloaded and processed
Let us look at the code. First, let's look at the application.properties. It contains MongoDB details, external system details (not mentioned below from safety reasons), and some other properties:
spring.data.mongodb.host= mongodb://localhost/
spring.data.mongodb.port= 27017
spring.data.mongodb.database= db-replica
spring.data.mongodb.repositories.enabled= true
mongodb.bulk.batchSize= 1000
# [s,S] stands for seconds, [m,M] stands for minutes, [h,H] stands for hours. Format is 1m = 1 minute, 12H == 12 hours
request.connection.timeout= 2h
# how many lines from csv will be loaded to program memory. To many could cause Out Of Memory exception.
csv.chunkSize= 50000
# save csv files to that directory
csv.directory= ./csv
# admin log purposes, is set as 'true' then logging is active
log.while.waiting.for.response= true
# admin log purposes, frequency which log msg appears in the logs in seconds.
# 15 means every 15 seconds message will appear in the logs until request is finished
log.wait.interval= 5
The application had to download data from several endpoints, so I decided that each point should be built as a step. Furthermore, each step should be an independent Tasklet. I defined the same abstraction for each step because each step processed the data in the same way, but it has to store them into a different collection. Here is the abstraction:
abstract class AbstractStep implements Tasklet {
protected final Logger logger;
private static final String ERROR_COLLECTION_NAME = "csvErrors";
@Value("${palantir.endpoint}")
private String basicUrl;
@Value("${mongodb.bulk.batchSize}")
private int batchSize = 1000;
@Value("${csv.chunkSize}")
private int chunkSize = 50000;
@Value("${palantir.branch}")
private String branchName;
@Value("${csv.directory}")
private String csvDirectory = "./csv";
private MongoTemplate mongoTemplate;
private final PalantirClient palantirClient;
private final String dataSetName;
private final String collectionName;
private final String dataSetRid;
private String[] headers;
private final String filePath;
private final Collection<CreationDetails> creationDetails;
protected AbstractStep(PalantirClient palantirClient, MongoTemplate mongoTemplate,
String dataSetName, String collectionName, String dataSetRid) {
this.palantirClient = palantirClient;
this.mongoTemplate = mongoTemplate;
this.dataSetName = dataSetName;
this.collectionName = collectionName;
this.dataSetRid = dataSetRid;
this.filePath = String.format("%s/%s.csv", csvDirectory, dataSetName);
this.creationDetails = new HashSet<>();
this.logger = LoggerFactory.getLogger(this.getClass());
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext)
throws IOException, JSONException {
TimeLog timeLog = new TimeLog(getClass().getSimpleName());
timeLog.logTime("dropTemporaryCollection");
dropTemporaryCollection();
timeLog.logTime("extractColumns");
extractColumns();
timeLog.logTime("downloadCsv");
downloadCsv();
timeLog.logTime("mapAndSaveToDB");
mapAndSaveToDB();
timeLog.logTime("createIndexes");
createIndexes();
timeLog.logTime("renameTempCollection");
renameTempCollection();
timeLog.logTime("upsertCreationDetails");
upsertCreationDetails();
timeLog.logTime("deleteFile");
deleteFile();
timeLog.done();
return RepeatStatus.FINISHED;
}
...
protected abstract void createIndexes();
protected abstract QueryColumnBuilder getQueryColumnBuilder(String branchName);
protected abstract CreationDetails createCreationDetail(DBObject dbObject, Date created);
}
The important classes here are the PalantirClient
and the MongoTemplete
. ThePalantirClient
is responsible for getting the data from the external system via a REST API. The name 'Palantir' is the name of the external system. I will not describe it here because it is a simple rest client with few methods. It is also available on GitHub. The MongoTempate
is specific to MongoDB, and it helps execute CRUD operations on MongoDB collections and documents.
AbstractStep
implements the interface Tasklet from the Spring Batch module, so it has to contain the implementation of the method execute
. And this method is the heart of the application. Each method contains a set of small steps that have to be done in order to download, convert, and store the data in MongoDB. From the design patterns point of view, method execute
is a template
method. When it comes to storing the data, from safety reasons, first, the temporary collection is built. When all of the data are processed successfully, then the proper indexes are created and the temporary collection is renamed to the target collection. The TimeLog
is a simple logger that logs the time of particular methods. AbstractStep
also contains some domain specific attributes that can be left without explanation. These attributes are valid only to that specific case.
Now, let's have a look at one of the concrete implementation of that abstract class:
@Component
public class OperationalRouteStep extends AbstractStep {
private static final String COLLECTION_NAME = "operationalRoutes";
private static final String DATA_SET_NAME = "operational_routes";
private static final String DATA_SET_RID = "4ef1e435-cb2a-450e-ba18-e42263057379";
@Autowired
public OperationalRouteStep(MongoTemplate mongoTemplate, PalantirClient palantirClient) {
super(palantirClient, mongoTemplate, DATA_SET_NAME, COLLECTION_NAME, DATA_SET_RID);
}
@Override
protected void createIndexes() {
DBCollection tempDBCollection = getTempDBCollection();
String indexName = "shipment_version_instance_id_1";
logger.debug("Creating index [{}]", indexName);
Index index = new Index().on("SHIPMENT_VERSION_INSTANCE_ID", Sort.Direction.DESC)
.background();
tempDBCollection.createIndex(index.getIndexKeys(), indexName, false);
}
@Override
protected QueryColumnBuilder getQueryColumnBuilder(String branchName) {
return QueryColumnBuilder.queryForOperationalRoutesColumns(branchName);
}
@Override
protected CreationDetails createCreationDetail(DBObject dbObject, Date creationDate) {
return new CreationDetails(
"SHIPMENT_VERSION_INSTANCE_ID",
String.valueOf(dbObject.get("SHIPMENT_VERSION_INSTANCE_ID")),
creationDate
);
}
}
OperationalRouteStep
class is a Spring bean that contains domain-specific logic. This step is responsible for replicating data from an external system located under the dataset name operation_routes to the MongoDB collection named operationalRoutes
. The class also contains a unique id of the table in the external system. Here, we can see the details of the index. The index is created this way because, in another system, the data are selected by the attribute SHIPMENT_VERSION_INSTANCE_ID. This class is using QueryColumnBuilder
to build a script for extracting data from an external system. It also uses CreationDetails
, which holds the information when a specific document was created and is a simple DTO class.
I will not describe QueryColumnBuilder
here. It suffices to say that this builder builds a query to gather information about columns of the table (CSV headers). PalantirClient
uses this builder in one of its methods. The code is available on GitHub.
I would like to show you another concrete implementation of the AbstractStep
.
@Component
public class EquipmentCargoStep extends AbstractStep {
private static final String COLLECTION_NAME = "equipmentCargo";
private static final String DATA_SET_NAME = "equipment_cargo";
private static final String DATA_SET_RID = "0fc9d55a-142e-4385-883d-db1c1a5ef2b4";
@Autowired
public EquipmentCargoStep(MongoTemplate mongoTemplate, PalantirClient palantirClient) {
super(palantirClient, mongoTemplate, DATA_SET_NAME, COLLECTION_NAME, DATA_SET_RID);
}
@Override
protected void createIndexes() {
DBCollection tempDBCollection = getTempDBCollection();
String indexName = "fk_shipment_version_1";
logger.debug("Creating index [{}]", indexName);
Index index = new Index().on("FK_SHIPMENT_VERSION", Sort.Direction.DESC)
.background();
tempDBCollection.createIndex(index.getIndexKeys(), indexName, false);
indexName = "equipment_assignment_instance_id_1";
logger.debug("Creating index [{}]", indexName);
index = new Index().on("EQUIPMENT_ASSIGNMENT_INSTANCE_ID", Sort.Direction.DESC)
.background();
tempDBCollection.createIndex(index.getIndexKeys(), indexName, false);
}
@Override
protected QueryColumnBuilder getQueryColumnBuilder(String branchName) {
return QueryColumnBuilder.queryForEquipmentCargoColumns(branchName);
}
@Override
protected CreationDetails createCreationDetail(DBObject dbObject, Date creationDate) {
return new CreationDetails(
"EQUIPMENT_ASSIGNMENT_INSTANCE_ID",
String.valueOf(dbObject.get("EQUIPMENT_ASSIGNMENT_INSTANCE_ID")),
creationDate
);
}
}
As you can see, the class EquipmentCargoStep
is very similar to OperationalRouteStep
. It has a different index on the collection, which is different from CreationDetails
and a method from the QueryColumnBuilder
is used. All else is the same.
And, that is it! The concrete classes are short, clean, and understandable. There are other concrete implementations of the AbstractStep
that looks almost the same as above and differs only in the details. But hey, that is about all, right?
So, we have reviewed the steps and now let's see how the batch and job configuration is done. First, let's see the batch configuration. Batch configuration is done in the class MainBatchConfigurer
:
@Configuration
public class MainBatchConfigurer implements BatchConfigurer {
@Autowired
private ExecutionContextDao mongoExecutionContextDao;
@Autowired
private JobExecutionDao mongoJobExecutionDao;
@Autowired
private JobInstanceDao mongoJobInstanceDao;
@Autowired
private StepExecutionDao mongoStepExecutionDao;
@Override
public JobRepository getJobRepository() {
return new SimpleJobRepository(
mongoJobInstanceDao,
mongoJobExecutionDao,
mongoStepExecutionDao,
mongoExecutionContextDao
);
}
@Override
public PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
@Override
public SimpleJobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() {
return new SimpleJobExplorer(
mongoJobInstanceDao,
mongoJobExecutionDao,
mongoStepExecutionDao,
mongoExecutionContextDao
);
}
private JobOperator jobOperator() throws Exception {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(getJobLauncher());
jobOperator.setJobExplorer(getJobExplorer());
jobOperator.setJobRepository(getJobRepository());
jobOperator.setJobRegistry(jobRegistry());
return jobOperator;
}
private JobRegistry jobRegistry() {
return new MapJobRegistry();
}
private JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry());
return postProcessor;
}
}
This configuration was done based on the documentation I found on the Spring page. The interesting part here is letting Spring know that classes relevant to MongoDB are used together with batch. All @Autowired
interfaces are relevant to MongoDB implementations that were a bit hard to find. Why do I even need them? Because I wanted to have all of the Spring Batch goodies out of the box. What are these goodies? In short, these are all of the data that is relevant to execution of the batch and jobs defined inside that batch. These implementations, MongoExecutionContextDao
, MongoJobExecutionDao
, MongoJobInstanceDao
, and MongoStepExecutionDao
, manually convert objects like JobExecution
and StepExecution
to MongoDB documents and attaches the JobParameters
. You can take a look on GitHub or just wait for my next article to learn more!
Now, let us look at the job configuration:
@Component
public class JobConfigurer {
private static final String JOB_NAME = "SYNC-DBS";
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
private final Tasklet equipmentCargoStep;
private final Tasklet haulageEquipmentStep;
private final Tasklet haulageInfoStep;
private final Tasklet operationalRouteStep;
private final Tasklet trackingBookingStep;
private final Tasklet cargoConditioningStep;
@Autowired
public JobConfigurer(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory,
Tasklet equipmentCargoStep, Tasklet haulageEquipmentStep,
Tasklet haulageInfoStep, Tasklet operationalRouteStep,
Tasklet trackingBookingStep, Tasklet cargoConditioningStep) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.equipmentCargoStep = equipmentCargoStep;
this.haulageEquipmentStep = haulageEquipmentStep;
this.haulageInfoStep = haulageInfoStep;
this.operationalRouteStep = operationalRouteStep;
this.trackingBookingStep = trackingBookingStep;
this.cargoConditioningStep = cargoConditioningStep;
}
public Job synchroniseDatabasesJob() {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(parametersIncrementer())
.preventRestart()
// '*' means that we do not care if it fails or success, step are independent and should be executed that way
.start(trackingBookingStep()).on("*").to(operationRouteStep())
.from(operationRouteStep()).on("*").to(equipmentCargoStep())
.from(equipmentCargoStep()).on("*").to(cargoConditioningStep())
.from(cargoConditioningStep()).on("*").to(haulageInfoStep())
// when the plan is COMPLETED, only then we can download equipments for plan
.from(haulageInfoStep()).on("COMPLETED").to(haulageEquipmentStep())
.from(haulageEquipmentStep()).on("*").end()
.end()
.build();
}
private JobParametersIncrementer parametersIncrementer() {
return jobParameters -> {
if (jobParameters == null || jobParameters.isEmpty()) {
return new JobParametersBuilder()
.addDate(JobParameter.RUN_ID.key(), new Date(System.currentTimeMillis()))
.toJobParameters();
}
Date id = jobParameters.getDate(JobParameter.RUN_ID.key(), new Date(System.currentTimeMillis()));
return new JobParametersBuilder()
.addDate(JobParameter.RUN_ID.key(), id)
.toJobParameters();
};
}
private Step trackingBookingStep() {
return this.stepBuilderFactory.get(trackingBookingStep.getClass().getSimpleName())
.tasklet(trackingBookingStep)
.build();
}
private Step operationRouteStep() {
return this.stepBuilderFactory.get(operationalRouteStep.getClass().getSimpleName())
.tasklet(operationalRouteStep)
.build();
}
private Step cargoConditioningStep() {
return this.stepBuilderFactory.get(operationalRouteStep.getClass().getSimpleName())
.tasklet(cargoConditioningStep)
.build();
}
private Step equipmentCargoStep() {
return this.stepBuilderFactory.get(equipmentCargoStep.getClass().getSimpleName())
.tasklet(equipmentCargoStep)
.build();
}
private Step haulageInfoStep() {
return this.stepBuilderFactory.get(haulageInfoStep.getClass().getSimpleName())
.tasklet(haulageInfoStep)
.build();
}
private Step haulageEquipmentStep() {
return this.stepBuilderFactory.get(haulageEquipmentStep.getClass().getSimpleName())
.tasklet(haulageEquipmentStep)
.build();
}
}
The JobConfigurer
is a Spring bean. It @Autowired
all the steps and default JobBuilderFactory
along with the StepBuilderFactory
. The StepBuilderFactory
is used to build each step based on specific implementations of Tasklet. JobBuilderFactory
is used to build the job. This job is nothing else but a set of steps executed one after another. Since each step is independent, it does not matter if the previous job's result was a success or not. The next step should be executed anyway. This is achieved by using *
as a parameter of the on(*)
method. There is one exception from that rule — HaulageEquipmentStep
should be executed only if haulageInfoStep
was completed.
Since this application is using Spring Boot, then let's have a look at the class with the main
method.
@EnableBatchProcessing
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class Application implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
@Autowired
private MainBatchConfigurer batchConfigurer;
@Autowired
private JobConfigurer jobConfigurer;
@Value("${csv.directory}")
private String csvDirectory = "./csv";
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... strings) {
TimeLog timeLog = new TimeLog(getClass().getSimpleName());
try {
long timestamp = System.currentTimeMillis();
logger.debug("Start GCSS batch with timestamp '{}'.", timestamp);
SimpleJobLauncher jobLauncher = batchConfigurer.getJobLauncher();
JobParameters jobParameters = new JobParametersBuilder()
.addDate(JobParameter.RUN_ID.key(), new Date(timestamp))
.toJobParameters();
timeLog.logTime("Job execution");
JobExecution jobExecution = jobLauncher.run(jobConfigurer.synchroniseDatabasesJob(), jobParameters);
logger.debug("Job execution finished, timestamp = {}, job execution status = {}.", timestamp, jobExecution.getStatus());
} catch (Exception e) {
logger.error("Error when launching job.", e);
} finally {
try {
timeLog.logTime("Delete " + csvDirectory);
FileUtils.forceDelete(new File(csvDirectory));
} catch (IOException e) {
logger.error("Error when deleting {} directory.", csvDirectory, e);
}
}
timeLog.done();
System.exit(0);
}
}
The Application
class implements the CommandLineRunner
because it is executed from the command line. Here, Spring Batch is enabled, and the DataSourceAutoConfiguration
is excluded because I wanted to create my own configuration of MongoDB. This configuration can be found in MongoDBConfig
. Then, there is a run
method that launches the batch. And, that is it.
Summary
In this article, I wanted to show you how to build Spring Batch applications that download and convert data from one system to another. I skipped some explanations like converting CSV file to a MongoDB collection, but this is logic only valid for that business case. But do not worry — if you want to go deep in this example, I encourage you to see full code on GitHub. Maybe you will find there something valuable for you. I hope this tutorial was helpful!
Opinions expressed by DZone contributors are their own.
Comments