Chunk Oriented Processing in Spring Batch
Big Data Sets’ Processing is one of the most important problem in the software world. Spring Batch is a lightweight and robust batch framework to process the data sets. Spring Batch Framework offers ‘TaskletStep Oriented’ and ‘Chunk Oriented’ processing style. In this article, Chunk Oriented Processing Model is explained. Also, TaskletStep Oriented Processing in Spring Batch Article is definitely suggested to investigate how to develop TaskletStep Oriented Processing in Spring Batch. Chunk Oriented Processing Feature has come with Spring Batch v2.0. It refers to reading the data one at a time, and creating ‘chunks’ that will be written out, within a transaction boundary. One item is read from an ItemReader, handed to an ItemProcessor, and written. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter, and then the transaction is committed. Basically, this feature should be used if at least one data item’ s reading and writing is required. Otherwise, TaskletStep Oriented processing can be used if the data item’ s only reading or writing is required. Chunk Oriented Processing model exposes three important interface as ItemReader, ItemProcessor and ItemWriter via org.springframework.batch.item package. ItemReader : This interface is used for providing the data. It reads the data which will be processed. ItemProcessor : This interface is used for item transformation. It processes input object and transforms to output object. ItemWriter : This interface is used for generic output operations. It writes the datas which are transformed by ItemProcessor. For example, the datas can be written to database, memory or outputstream (etc). In this sample application, we will write to database. Let us take a look how to develop Chunk Oriented Processing Model. Used Technologies : JDK 1.7.0_09 Spring 3.1.3 Spring Batch 2.1.9 Hibernate 4.1.8 Tomcat JDBC 7.0.27 MySQL 5.5.8 MySQL Connector 5.1.17 Maven 3.0.4 Step 1 : Create Maven Project A maven project is created as below. (It can be created by using Maven or IDE Plug-in). Step 2: Libraries A new USER Table is created by executing below script: CREATE TABLE ONLINETECHVISION.USER ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(45) NOT NULL, surname varchar(45) NOT NULL, PRIMARY KEY (`id`) ); Step 3: Libraries Firstly, dependencies are added to Maven’ s pom.xml. 3.1.3.RELEASE 2.1.9.RELEASE org.springframework spring-core ${spring.version} org.springframework spring-context ${spring.version} org.springframework spring-tx ${spring.version} org.springframework spring-orm ${spring.version} org.springframework.batch spring-batch-core ${spring-batch.version} org.hibernate hibernate-core 4.1.8.Final org.apache.tomcat tomcat-jdbc 7.0.27 mysql mysql-connector-java 5.1.17 log4j log4j 1.2.16 maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7 org.apache.maven.plugins maven-compiler-plugin 3.0 1.7 1.7 The following Maven plugin can be used to create runnable-jar, org.apache.maven.plugins maven-shade-plugin 2.0 package shade 1.7 1.7 com.onlinetechvision.exe.Application META-INF/spring.handlers META-INF/spring.schemas Step 4 : Create User Entity User Entity is created. This entity will be stored after processing. package com.onlinetechvision.user; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; /** * User Entity * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ @Entity @Table(name="USER") public class User { private int id; private String name; private String surname; @Id @GeneratedValue(strategy=GenerationType.AUTO) @Column(name="ID", unique = true, nullable = false) public int getId() { return id; } public void setId(int id) { this.id = id; } @Column(name="NAME", unique = true, nullable = false) public String getName() { return name; } public void setName(String name) { this.name = name; } @Column(name="SURNAME", unique = true, nullable = false) public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } @Override public String toString() { StringBuffer strBuff = new StringBuffer(); strBuff.append("id : ").append(getId()); strBuff.append(", name : ").append(getName()); strBuff.append(", surname : ").append(getSurname()); return strBuff.toString(); } } Step 5 : Create IUserDAO Interface IUserDAO Interface is created to expose data access functionality. package com.onlinetechvision.user.dao; import java.util.List; import com.onlinetechvision.user.User; /** * User DAO Interface * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public interface IUserDAO { /** * Adds User * * @param User user */ void addUser(User user); /** * Gets User List * */ List getUsers(); } Step 6 : Create UserDAO IMPL UserDAO Class is created by implementing IUserDAO Interface. package com.onlinetechvision.user.dao; import java.util.List; import org.hibernate.SessionFactory; import com.onlinetechvision.user.User; /** * User DAO * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class UserDAO implements IUserDAO { private SessionFactory sessionFactory; /** * Gets Hibernate Session Factory * * @return SessionFactory - Hibernate Session Factory */ public SessionFactory getSessionFactory() { return sessionFactory; } /** * Sets Hibernate Session Factory * * @param SessionFactory - Hibernate Session Factory */ public void setSessionFactory(SessionFactory sessionFactory) { this.sessionFactory = sessionFactory; } /** * Adds User * * @param User user */ @Override public void addUser(User user) { getSessionFactory().getCurrentSession().save(user); } /** * Gets User List * * @return List - User list */ @SuppressWarnings({ "unchecked" }) @Override public List getUsers() { List list = getSessionFactory().getCurrentSession().createQuery("from User").list(); return list; } } Step 7 : Create IUserService Interface IUserService Interface is created for service layer. package com.onlinetechvision.user.service; import java.util.List; import com.onlinetechvision.user.User; /** * * User Service Interface * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public interface IUserService { /** * Adds User * * @param User user */ void addUser(User user); /** * Gets User List * * @return List - User list */ List getUsers(); } Step 8 : Create UserService IMPL UserService Class is created by implementing IUserService Interface. package com.onlinetechvision.user.service; import java.util.List; import org.springframework.transaction.annotation.Transactional; import com.onlinetechvision.user.User; import com.onlinetechvision.user.dao.IUserDAO; /** * * User Service * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ @Transactional(readOnly = true) public class UserService implements IUserService { IUserDAO userDAO; /** * Adds User * * @param User user */ @Transactional(readOnly = false) @Override public void addUser(User user) { getUserDAO().addUser(user); } /** * Gets User List * */ @Override public List getUsers() { return getUserDAO().getUsers(); } public IUserDAO getUserDAO() { return userDAO; } public void setUserDAO(IUserDAO userDAO) { this.userDAO = userDAO; } } Step 9 : Create TestReader IMPL TestReader Class is created by implementing ItemReader Interface. This class is called in order to read items. When read method returns null, reading operation is completed. The following steps explains with details how to be executed firstJob. The commit-interval value of firstjob is 2 and the following steps are executed : 1) firstTestReader is called to read first item(firstname_0, firstsurname_0) 2) firstTestReader is called again to read second item(firstname_1, firstsurname_1) 3) testProcessor is called to process first item(FIRSTNAME_0, FIRSTSURNAME_0) 4) testProcessor is called to process second item(FIRSTNAME_1, FIRSTSURNAME_1) 5) testWriter is called to write first item(FIRSTNAME_0, FIRSTSURNAME_0) to database 6) testWriter is called to write second item(FIRSTNAME_1, FIRSTSURNAME_1) to database 7) first and second items are committed and the transaction is closed. 8) firstTestReader is called to read third item(firstname_2, firstsurname_2) 9) maxIndex value of firstTestReader is 3. read method returns null and item reading operation is completed. 10) testProcessor is called to process third item(FIRSTNAME_2, FIRSTSURNAME_2) 11) testWriter is called to write first item(FIRSTNAME_2, FIRSTSURNAME_2) to database 12) third item is committed and the transaction is closed. firstStep is completed with COMPLETED status and secondStep is started. secondJob and thirdJob are executed in the same way. package com.onlinetechvision.item; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.onlinetechvision.user.User; /** * TestReader Class is created to read items which will be processed * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class TestReader implements ItemReader { private int index; private int maxIndex; private String namePrefix; private String surnamePrefix; /** * Reads items one by one * * @return User * * @throws Exception * @throws UnexpectedInputException * @throws ParseException * @throws NonTransientResourceException * */ @Override public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { User user = new User(); user.setName(getNamePrefix() + "_" + index); user.setSurname(getSurnamePrefix() + "_" + index); if(index > getMaxIndex()) { return null; } incrementIndex(); return user; } /** * Increments index which defines read-count * * @return int * */ private int incrementIndex() { return index++; } public int getMaxIndex() { return maxIndex; } public void setMaxIndex(int maxIndex) { this.maxIndex = maxIndex; } public String getNamePrefix() { return namePrefix; } public void setNamePrefix(String namePrefix) { this.namePrefix = namePrefix; } public String getSurnamePrefix() { return surnamePrefix; } public void setSurnamePrefix(String surnamePrefix) { this.surnamePrefix = surnamePrefix; } } Step 10 : Create FailedCaseTestReader IMPL FailedCaseTestReader Class is created in order to simulate the failed job status. In this sample application, when thirdJob is processed at fifthStep, failedCaseTestReader is called and exception is thrown so its status will be FAILED. package com.onlinetechvision.item; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.onlinetechvision.user.User; /** * FailedCaseTestReader Class is created in order to simulate the failed job status. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class FailedCaseTestReader implements ItemReader { private int index; private int maxIndex; private String namePrefix; private String surnamePrefix; /** * Reads items one by one * * @return User * * @throws Exception * @throws UnexpectedInputException * @throws ParseException * @throws NonTransientResourceException * */ @Override public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { User user = new User(); user.setName(getNamePrefix() + "_" + index); user.setSurname(getSurnamePrefix() + "_" + index); if(index >= getMaxIndex()) { throw new Exception("Unexpected Error!"); } incrementIndex(); return user; } /** * Increments index which defines read-count * * @return int * */ private int incrementIndex() { return index++; } public int getMaxIndex() { return maxIndex; } public void setMaxIndex(int maxIndex) { this.maxIndex = maxIndex; } public String getNamePrefix() { return namePrefix; } public void setNamePrefix(String namePrefix) { this.namePrefix = namePrefix; } public String getSurnamePrefix() { return surnamePrefix; } public void setSurnamePrefix(String surnamePrefix) { this.surnamePrefix = surnamePrefix; } } Step 11 : Create TestProcessor IMPL TestProcessor Class is created by implementing ItemProcessor Interface. This class is called to process items. User item is received from TestReader, processed and returned to TestWriter. package com.onlinetechvision.item; import java.util.Locale; import org.springframework.batch.item.ItemProcessor; import com.onlinetechvision.user.User; /** * TestProcessor Class is created to process items. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class TestProcessor implements ItemProcessor { /** * Processes items one by one * * @param User user * @return User * @throws Exception * */ @Override public User process(User user) throws Exception { user.setName(user.getName().toUpperCase(Locale.ENGLISH)); user.setSurname(user.getSurname().toUpperCase(Locale.ENGLISH)); return user; } } Step 12 : Create TestWriter IMPL TestWriter Class is created by implementing ItemWriter Interface. This class is called to write items to DB, memory etc… package com.onlinetechvision.item; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.onlinetechvision.user.User; import com.onlinetechvision.user.service.IUserService; /** * TestWriter Class is created to write items to DB, memory etc... * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class TestWriter implements ItemWriter { private IUserService userService; /** * Writes items via list * * @throws Exception * */ @Override public void write(List userList) throws Exception { for(User user : userList) { getUserService().addUser(user); } System.out.println("User List : " + getUserService().getUsers()); } public IUserService getUserService() { return userService; } public void setUserService(IUserService userService) { this.userService = userService; } } Step 13 : Create FailedStepTasklet Class FailedStepTasklet is created by implementing Tasklet Interface. It illustrates business logic in failed step. package com.onlinetechvision.tasklet; import org.apache.log4j.Logger; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; /** * FailedStepTasklet Class illustrates a failed job. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class FailedStepTasklet implements Tasklet { private static final Logger logger = Logger.getLogger(FailedStepTasklet.class); private String taskResult; /** * Executes FailedStepTasklet * * @param StepContribution stepContribution * @param ChunkContext chunkContext * @return RepeatStatus * @throws Exception * */ public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { logger.debug("Task Result : " + getTaskResult()); throw new Exception("Error occurred!"); } public String getTaskResult() { return taskResult; } public void setTaskResult(String taskResult) { this.taskResult = taskResult; } } Step 14 : Create BatchProcessStarter Class BatchProcessStarter Class is created to launch the jobs. Also, it logs their execution results. package com.onlinetechvision.spring.batch; import org.apache.log4j.Logger; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; /** * BatchProcessStarter Class launches the jobs and logs their execution results. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class BatchProcessStarter { private static final Logger logger = Logger.getLogger(BatchProcessStarter.class); private Job firstJob; private Job secondJob; private Job thirdJob; private JobLauncher jobLauncher; private JobRepository jobRepository; /** * Starts the jobs and logs their execution results. * */ public void start() { JobExecution jobExecution = null; JobParametersBuilder builder = new JobParametersBuilder(); try { getJobLauncher().run(getFirstJob(), builder.toJobParameters()); jobExecution = getJobRepository().getLastJobExecution(getFirstJob().getName(), builder.toJobParameters()); logger.debug(jobExecution.toString()); getJobLauncher().run(getSecondJob(), builder.toJobParameters()); jobExecution = getJobRepository().getLastJobExecution(getSecondJob().getName(), builder.toJobParameters()); logger.debug(jobExecution.toString()); getJobLauncher().run(getThirdJob(), builder.toJobParameters()); jobExecution = getJobRepository().getLastJobExecution(getThirdJob().getName(), builder.toJobParameters()); logger.debug(jobExecution.toString()); } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) { logger.error(e); } } public Job getFirstJob() { return firstJob; } public void setFirstJob(Job firstJob) { this.firstJob = firstJob; } public Job getSecondJob() { return secondJob; } public void setSecondJob(Job secondJob) { this.secondJob = secondJob; } public Job getThirdJob() { return thirdJob; } public void setThirdJob(Job thirdJob) { this.thirdJob = thirdJob; } public JobLauncher getJobLauncher() { return jobLauncher; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } public JobRepository getJobRepository() { return jobRepository; } public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } } Step 15 : Create dataContext.xml jdbc.properties, is created. It defines data-source informations and is read via dataContext.xml jdbc.db.driverClassName=com.mysql.jdbc.Driver jdbc.db.url=jdbc:mysql://localhost:3306/onlinetechvision jdbc.db.username=root jdbc.db.password=root jdbc.db.initialSize=10 jdbc.db.minIdle=3 jdbc.db.maxIdle=10 jdbc.db.maxActive=10 jdbc.db.testWhileIdle=true jdbc.db.testOnBorrow=true jdbc.db.testOnReturn=true jdbc.db.initSQL=SELECT 1 FROM DUAL jdbc.db.validationQuery=SELECT 1 FROM DUAL jdbc.db.timeBetweenEvictionRunsMillis=30000 Step 16 : Create dataContext.xml Spring Configuration file, dataContext.xml, is created. It covers dataSource, sessionFactory and transactionManager definitions. com.onlinetechvision.user.User org.hibernate.dialect.MySQLDialect true Step 17 : Create jobContext.xml Spring Configuration file, jobContext.xml, is created. It covers jobRepository, jobLauncher, item reader, item processor, item writer, tasklet and job definitions. Step 18 : Create applicationContext.xml Spring Configuration file, applicationContext.xml, is created. It covers bean definitions. Step 19 : Create Application Class Application Class is created to run the application. package com.onlinetechvision.exe; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.onlinetechvision.spring.batch.BatchProcessStarter; /** * Application Class starts the application. * * @author onlinetechvision.com * @since 10 Dec 2012 * @version 1.0.0 * */ public class Application { /** * Starts the application * * @param String[] args * */ public static void main(String[] args) { ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml"); BatchProcessStarter batchProcessStarter = (BatchProcessStarter)appContext.getBean("batchProcessStarter"); batchProcessStarter.start(); } } Step 20 : Build Project After OTV_SpringBatch_Chunk_Oriented_Processing Project is built, OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar will be created. STEP 21 : RUN PROJECT After created OTV_SpringBatch_Chunk_Oriented_Processing-0.0.1-SNAPSHOT.jar file is run, the following database and console output logs will be shown : Database screenshot : First Job’ s console output : 16.12.2012 19:30:41 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=firstJob]] launched with the following parameters: [{}] 16.12.2012 19:30:41 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=0, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:41 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]] User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2] 16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:43) - JobExecution: id=0, version=2, startTime=Sun Dec 16 19:30:41 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{}], Job=[firstJob]] Second Job’ s console output : 16.12.2012 19:30:42 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=secondJob]] launched with the following parameters: [{}] 16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=1, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]] User List : [id : 181, name : FIRSTNAME_0, surname : FIRSTSURNAME_0, id : 182, name : FIRSTNAME_1, surname : FIRSTSURNAME_1, id : 183, name : FIRSTNAME_2, surname : FIRSTSURNAME_2, id : 184, name : SECONDNAME_0, surname : SECONDSURNAME_0, id : 185, name : SECONDNAME_1, surname : SECONDSURNAME_1, id : 186, name : SECONDNAME_2, surname : SECONDSURNAME_2, id : 187, name : THIRDNAME_0, surname : THIRDSURNAME_0, id : 188, name : THIRDNAME_1, surname : THIRDSURNAME_1, id : 189, name : THIRDNAME_2, surname : THIRDSURNAME_2, id : 190, name : THIRDNAME_3, surname : THIRDSURNAME_3, id : 191, name : FOURTHNAME_0, surname : FOURTHSURNAME_0, id : 192, name : FOURTHNAME_1, surname : FOURTHSURNAME_1, id : 193, name : FOURTHNAME_2, surname : FOURTHSURNAME_2, id : 194, name : FOURTHNAME_3, surname : FOURTHSURNAME_3] 16.12.2012 19:30:42 DEBUG (BatchProcessStarter.java:47) - JobExecution: id=1, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:42 GMT 2012, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, JobParameters=[{}], Job=[secondJob]] Third Job’ s console output : 16.12.2012 19:30:42 INFO (SimpleJobLauncher.java:118) - Job: [FlowJob: [name=thirdJob]] launched with the following parameters: [{}] 16.12.2012 19:30:42 DEBUG (AbstractJob.java:278) - Job execution starting: JobExecution: id=2, version=0, startTime=null, endTime=null, lastUpdated=Sun Dec 16 19:30:42 GMT 2012, status=STARTING, exitStatus=exitCode=UNKNOWN;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]] 16.12.2012 19:30:42 DEBUG (TransactionTemplate.java:159) - Initiating transaction rollback on application exception org.springframework.batch.repeat.RepeatException: Exception in batch process; nested exception is java.lang.Exception: Unexpected Error! ... 16.12.2012 19:30:43 DEBUG (BatchProcessStarter.java:51) - JobExecution: id=2, version=2, startTime=Sun Dec 16 19:30:42 GMT 2012, endTime=Sun Dec 16 19:30:43 GMT 2012, lastUpdated=Sun Dec 16 19:30:43 GMT 2012, status=FAILED, exitStatus=exitCode=FAILED;exitDescription=, job=[JobInstance: id=2, version=0, JobParameters=[{}], Job=[thirdJob]] Step 22 : Download https://github.com/erenavsarogullari/OTV_SpringBatch_Chunk_Oriented_Processing REFERENCES : Chunk Oriented Processing in Spring Batch
January 3, 2013
by Eren Avsarogullari
·
151,567 Views
·
5 Likes