{{announcement.body}}
{{announcement.title}}

Spring Batch — AsyncItemProcessor and AsyncItemWriter Example

DZone 's Guide to

Spring Batch — AsyncItemProcessor and AsyncItemWriter Example

We will show how to read data from one table and write it into another table of the same schema making the use of AsyncItemProcessor and AsyncItemWriter.

· Java Zone ·
Free Resource

In this example, we will show how to read data from one table and write it into another table of the same schema making the use of AsyncItemProcessor and AsyncItemWriter.

Asynchronous Processors help you to scale the processing of items. In the asynchronous processor use case, an AsyncItemProcessor serves as a dispatcher, executing the logic of the ItemProcessor for an item on a new thread. Once the item completes, the Future is passed to the AsynchItemWriter to be written.

You may also like: An Introduction to Spring Batch

Therefore, you can increase performance by using asynchronous item processing, basically allowing you to implement fork-join scenarios. The AsyncItemWriter gathers the results and writes back the chunk as soon as all the results become available.

The following example shows how to configure the AsyncItemProcessor.

XML Configuration

Java




x


 
1
<bean id="processor" class="org.springframework.batch.integration.async.AsyncItemProcessor">
2
  <p>
3
    <bean class="your.ItemProcessor"/>
4
  </property>
5
  <p>
6
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
7
  </property>
8
</bean>



Java Configuration

Java




xxxxxxxxxx
1


 
1
@Bean
2
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
3
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
4
    asyncItemProcessor.setTaskExecutor(taskExecutor);
5
    asyncItemProcessor.setDelegate(itemProcessor);
6
    return asyncItemProcessor;
7
}



The delegate property refers to your ItemProcessor bean, and the taskExecutor property refers to the TaskExecutor of your choice.

The following example shows how to configure the AsyncItemWriter :

XML Configuration

Java




xxxxxxxxxx
1


 
1
<bean id="itemWriter" class="org.springframework.batch.integration.async.AsyncItemWriter">
2
  <p>
3
    <bean id="itemWriter" class="your.ItemWriter"/>
4
  </property>
5
</bean>



Java Configurations

Java




xxxxxxxxxx
1


 
1
@Bean
2
public AsyncItemWriter writer(ItemWriter itemWriter) {
3
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
4
    asyncItemWriter.setDelegate(itemWriter);
5
    return asyncItemWriter;
6
}



Again, the delegate property is actually a reference to your ItemWriter bean.

Maven Dependency

Java




xxxxxxxxxx
1
73


 
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0"
3
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5
    <modelVersion>4.0.0</modelVersion>
6
    <p>
7
        <groupId>org.springframework.boot</groupId>
8
        <artifactId>spring-boot-starter-parent</artifactId>
9
        <version>2.2.2.RELEASE</version>
10
        <relativePath /> <!-- lookup parent from repository -->
11
    </parent>
12
    <groupId>com.example</groupId>
13
    <artifactId>asyncItemProcessorItemWriter</artifactId>
14
    <version>0.0.1-SNAPSHOT</version>
15
    <p>jar</packaging>
16
    <name>asyncItemProcessorItemWriter</name>
17
    <description>Demo project for Spring Boot</description>
18
 
          
19
    <p>
20
        <java.version>1.8</java.version>
21
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22
    </properties>
23
 
          
24
    <dependencies>
25
        <dependency>
26
            <groupId>org.springframework.boot</groupId>
27
            <artifactId>spring-boot-starter-batch</artifactId>
28
        </dependency>
29
        <dependency>
30
            <groupId>org.springframework.boot</groupId>
31
            <artifactId>spring-boot-starter-jdbc</artifactId>
32
        </dependency>
33
        <dependency>
34
            <groupId>org.springframework.batch</groupId>
35
            <artifactId>spring-batch-integration</artifactId>
36
        </dependency>
37
        <dependency>
38
            <groupId>com.h2database</groupId>
39
            <artifactId>h2</artifactId>
40
            <scope>runtime</scope>
41
        </dependency>
42
        <dependency>
43
            <groupId>mysql</groupId>
44
            <artifactId>mysql-connector-java</artifactId>
45
            <scope>runtime</scope>
46
        </dependency>
47
        <dependency>
48
            <groupId>org.projectlombok</groupId>
49
            <artifactId>lombok</artifactId>
50
            <optional>true</optional>
51
        </dependency>
52
        <dependency>
53
            <groupId>org.springframework.boot</groupId>
54
            <artifactId>spring-boot-starter-test</artifactId>
55
            <scope>test</scope>
56
        </dependency>
57
        <dependency>
58
            <groupId>org.springframework.batch</groupId>
59
            <artifactId>spring-batch-test</artifactId>
60
            <scope>test</scope>
61
        </dependency>
62
    </dependencies>
63
 
          
64
    <build>
65
        <p>
66
            <p>
67
                <groupId>org.springframework.boot</groupId>
68
                <artifactId>spring-boot-maven-plugin</artifactId>
69
            </plugin>
70
        </plugins>
71
    </build>
72
</project>
73
 
          



CustomerRowMapper — An interface used by mapping rows of an on a per-row basis. Implementations of this interface perform the actual work of mapping each row to a result object but don't need to worry about exception handling will be caught and handled by the calling JdbcTemplate.

Java




xxxxxxxxxx
1
10


 
1
public class CustomerRowMapper implements RowMapper<Customer> {
2
    @Override
3
    public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
4
        return Customer.builder().id(rs.getLong("id"))
5
                .firstName(rs.getString("firstName"))
6
                .lastName(rs.getString("lastName"))
7
                .birthdate(rs.getString("birthdate"))
8
                .build();
9
    }
10
}



Customer — Domain Object which holds Customer Data.

Java




xxxxxxxxxx
1
10


 
1
@AllArgsConstructor
2
@NoArgsConstructor
3
@Builder
4
@Data
5
public class Customer {
6
    private Long id;
7
    private String firstName;
8
    private String lastName;
9
    private String birthdate;
10
}



JobConfiguration — Convenience interface that combines

Java
xxxxxxxxxx
1
95
 
1
@Configuration
2
public class JobConfiguration {
3
    @Autowired
4
    private JobBuilderFactory jobBuilderFactory;
5
    
6
    @Autowired
7
    private StepBuilderFactory stepBuilderFactory;
8
    
9
    @Autowired
10
    private DataSource dataSource;
11
    
12
    
13
    @Bean
14
    public JdbcPagingItemReader<Customer> customerPagingItemReader(){
15
        // reading database records using JDBC in a paging fashion
16
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
17
        reader.setDataSource(this.dataSource);
18
        reader.setFetchSize(1000);
19
        reader.setRowMapper(new CustomerRowMapper());
20
        
21
        // Sort Keys
22
        Map<String, Order> sortKeys = new HashMap<>();
23
        sortKeys.put("id", Order.ASCENDING);
24
        
25
        // MySQL implementation of a PagingQueryProvider using database specific features.
26
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
27
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
28
        queryProvider.setFromClause("from customer");
29
        queryProvider.setSortKeys(sortKeys);
30
        
31
        reader.setQueryProvider(queryProvider);
32
        
33
        return reader;
34
    }
35
    
36
    @Bean
37
    public ItemProcessor itemProcessor(){
38
        return new ItemProcessor<Customer, Customer>() {
39
 
          
40
            @Override
41
            public Customer process(Customer item) throws Exception {
42
                Thread.sleep(new Random().nextInt(10));
43
                return Customer.builder().id(item.getId()).firstName(item.getFirstName())
44
                        .lastName(item.getLastName()).birthdate(item.getBirthdate()).build();
45
            }
46
        };
47
    }
48
    
49
    @Bean
50
    public AsyncItemProcessor asyncItemProcessor() throws Exception{
51
        AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
52
        asyncItemProcessor.setDelegate(itemProcessor());
53
        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
54
        asyncItemProcessor.afterPropertiesSet();
55
        return asyncItemProcessor;
56
    }
57
    
58
    @Bean
59
    public JdbcBatchItemWriter<Customer> customerItemWriter(){
60
        JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
61
        writer.setDataSource(dataSource);
62
        writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
63
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
64
        writer.afterPropertiesSet();
65
        
66
        return writer;
67
    }
68
    
69
    @Bean
70
    public AsyncItemWriter<Customer> asyncItemWriter() throws Exception{
71
        AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
72
        asyncItemWriter.setDelegate(customerItemWriter());
73
        asyncItemWriter.afterPropertiesSet();
74
        return asyncItemWriter;  
75
    }
76
    
77
    @SuppressWarnings("unchecked")
78
    @Bean
79
    public Step step1() throws Exception {
80
        return stepBuilderFactory.get("step1")
81
                .chunk(1000)
82
                .reader(customerPagingItemReader())
83
                .processor(asyncItemProcessor())
84
                .writer(asyncItemWriter())
85
                .build();
86
    }
87
    
88
    @Bean
89
    public Job job() throws Exception {
90
        return jobBuilderFactory.get("job")
91
                .start(step1())
92
                .build();
93
    }
94
}
95
 
          


MainApp — Run this application as a Spring Boot App.

Java
xxxxxxxxxx
1
25
 
1
@SpringBootApplication
2
@EnableBatchProcessing
3
public class AsyncItemProcessorItemWriterApplication implements CommandLineRunner{
4
    @Autowired
5
    private JobLauncher jobLauncher;
6
 
          
7
    @Autowired
8
    private Job job;
9
    
10
    public static void main(String[] args) {
11
        SpringApplication.run(AsyncItemProcessorItemWriterApplication.class, args);
12
    }
13
    
14
    @Override
15
    public void run(String... args) throws Exception {
16
        JobParameters jobParameters = new JobParametersBuilder()
17
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
18
                .addDate("date", new Date())
19
                .addLong("time",System.currentTimeMillis()).toJobParameters();
20
        
21
        JobExecution execution = jobLauncher.run(job, jobParameters);
22
        System.out.println("STATUS :: "+execution.getStatus());
23
    }
24
}
25
 
          


Schema.sql

Java
xxxxxxxxxx
1
17
 
1
CREATE TABLE `test`.`customer` (
2
  `id` MEDIUMINT(8) UNSIGNED NOT NULL,
3
  `firstName` VARCHAR(255) NULL,
4
  `lastName` VARCHAR(255) NULL,
5
  `birthdate` VARCHAR(255) NULL,
6
  PRIMARY KEY (`id`)
7
  );
8
 
          
9
  
10
  
11
 CREATE TABLE `test`.`new_customer` (
12
  `id` MEDIUMINT(8) UNSIGNED NOT NULL,
13
  `firstName` VARCHAR(255) NULL,
14
  `lastName` VARCHAR(255) NULL,
15
  `birthdate` VARCHAR(255) NULL,
16
  PRIMARY KEY (`id`)
17
  );


Application.properties

Java
xxxxxxxxxx
1
 
1
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
2
spring.datasource.url=jdbc:mysql://localhost:3306/test
3
spring.datasource.username=root
4
spring.datasource.password=root
5
spring.batch.initialize-schema=always
6
spring.batch.job.enabled=false


Output

From the output, we can conclude that we were able to successfully migrate data from one table to another by making the use of AsyncItemProcessor and AsyncItemWriter.

Java
xxxxxxxxxx
1
33
 
1
mysql> use test;
2
Database changed
3
mysql> show tables;
4
+------------------------------+
5
| Tables_in_test               |
6
+------------------------------+
7
| batch_job_execution          |
8
| batch_job_execution_context  |
9
| batch_job_execution_params   |
10
| batch_job_execution_seq      |
11
| batch_job_instance           |
12
| batch_job_seq                |
13
| batch_step_execution         |
14
| batch_step_execution_context |
15
| batch_step_execution_seq     |
16
| customer                     |
17
| new_customer                 |
18
+------------------------------+
19
11 rows in set (0.00 sec)
20
 
          
21
mysql> select * from new_customer limit 5;
22
+----+-----------+----------+---------------------+
23
| id | firstName | lastName | birthdate           |
24
+----+-----------+----------+---------------------+
25
|  1 | John      | Doe      | 10-10-1952 10:10:10 |
26
|  2 | Amy       | Eugene   | 05-07-1985 17:10:00 |
27
|  3 | Laverne   | Mann     | 11-12-1988 10:10:10 |
28
|  4 | Janice    | Preston  | 19-02-1960 10:10:10 |
29
|  5 | Pauline   | Rios     | 29-08-1977 10:10:10 |
30
+----+-----------+----------+---------------------+
31
5 rows in set (0.00 sec)
32
 
          
33
mysql>


Further Reading

Spring Batch Read an XML File and Write to Oracle Database

Batch Processing Large Data Sets With Spring Boot and Spring Batch

Spring Batch: a Typical Use Case

Topics:
spring batch ,java ,tutorial ,java configuration ,xml ,xml configuration ,maven ,maven dependency

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}