{{announcement.body}}
{{announcement.title}}

Spring Batch - Apache Kafka Writer and Reader

DZone 's Guide to

Spring Batch - Apache Kafka Writer and Reader

In this tutorial, we'll learn how to write data into the Apache Kafka and read data back.

· Java Zone ·
Free Resource

In this tutorial, we'll learn how to write data into the Apache Kafka and read data back. Spring Batch v4.2 has bring a lot of good features, they're as follows - 

  • Support for batch metrics with Micrometer 
  • Support for reading/writing data from/to Apache Kafka topics
  • Support for reading/writing data from/to Apache Avro resources
  • Improved documentation

This release adds a new KafkaItemReader and KafkaItemWriter to read data from and write it to Kafka topics. For more details about these new components, please refer to the Javadoc

Apache Kafka is a very famous message broker. 


KafkaItemReader

The KafkaItemReader is an ItemReader for an Apache Kafka topic. It can be configured to read messages from multiple partitions of the same topic. It stores message offsets in the execution context to support restart capabilities. Spring Batch provides a KafkaItemReaderBuilder to construct an instance of the KafkaItemReader.

KafkaItemWriter

The KafkaItemWriter is an ItemWriter for Apache Kafka that uses a KafkaTemplate to send events to a default topic. Spring Batch provides a KafkaItemWriterBuilder to construct an instance of the KafkaItemWriter.

 Reader code

Customer.java - This is model class which holds Customer details.

Java
 




x
10


 
1
@AllArgsConstructor
2
@NoArgsConstructor
3
@Builder
4
@Data
5
public class Customer {
6
    private Long id;
7
    private String firstName;
8
    private String lastName;
9
    private String birthdate;
10
}



CustomerFieldSetMapper.java - This FieldSetMapper - Interface that is used to map data obtained from a  into an object.

Java
 




xxxxxxxxxx
1
14


1
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
2
    private static final DateTimeFormatter DT_FORMAT = DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss");
3
    
4
    @Override
5
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
6
        return Customer.builder()
7
                .id(fieldSet.readLong("id"))
8
                .firstName(fieldSet.readRawString("firstName"))
9
                .lastName(fieldSet.readRawString("lastName"))
10
                .birthdate(fieldSet.readRawString("birthdate"))
11
                .build();
12
    }
13
 
          
14
}



JobConfig.java - This is the main job configuration class.  Here we've used  KafkaItemWriter  to write into onto the kafta topic.

Java
 




xxxxxxxxxx
1
59


 
1
@Configuration
2
public class JobConfig {
3
 
          
4
    @Autowired
5
    private StepBuilderFactory stepBuilderFactory;
6
 
          
7
    @Autowired
8
    private JobBuilderFactory jobBuilderFactory;
9
    
10
    @Autowired
11
    private KafkaTemplate<Long, Customer> kafkaTemplate;
12
 
          
13
    @Bean
14
    public FlatFileItemReader<Customer> customerItemReader() {
15
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
16
        reader.setLinesToSkip(1);
17
        reader.setResource(new ClassPathResource("/data/customer.csv"));
18
 
          
19
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
20
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
21
 
          
22
        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
23
        customerLineMapper.setLineTokenizer(tokenizer);
24
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
25
        customerLineMapper.afterPropertiesSet();
26
 
          
27
        reader.setLineMapper(customerLineMapper);
28
 
          
29
        return reader;
30
    }
31
    
32
    @Bean
33
    public KafkaItemWriter<Long, Customer> kafkaItemWriter() throws Exception{
34
        KafkaItemWriter<Long, Customer> writer = new KafkaItemWriter<>();
35
        writer.setKafkaTemplate(kafkaTemplate);
36
        writer.setItemKeyMapper(Customer::getId);
37
        writer.setDelete(false);
38
        writer.afterPropertiesSet();
39
        return writer;
40
    }
41
        
42
    @Bean
43
    public Step step1() throws Exception {
44
        return stepBuilderFactory.get("step1")
45
                .<Customer, Customer>chunk(10)
46
                .reader(customerItemReader())
47
                .writer(kafkaItemWriter())
48
                .build();
49
    }
50
    
51
    @Bean
52
    public Job job() throws Exception {
53
        return jobBuilderFactory.get("job")
54
                .incrementer(new RunIdIncrementer())
55
                .start(step1())
56
                .build();
57
    }
58
}
59
 
          



SpringBatchKafkaWriterApplication.java

Java
 




xxxxxxxxxx
1
25


 
1
@SpringBootApplication
2
@EnableBatchProcessing
3
public class SpringBatchKafkaWriterApplication implements CommandLineRunner{
4
    @Autowired
5
    private JobLauncher jobLauncher;
6
 
          
7
    @Autowired
8
    private Job job;
9
    
10
    public static void main(String[] args) {
11
        SpringApplication.run(SpringBatchKafkaWriterApplication.class, args);
12
    }
13
 
          
14
    @Override
15
    public void run(String... args) throws Exception {
16
        JobParameters jobParameters = new JobParametersBuilder()
17
                .addString("jobId", UUID.randomUUID().toString())
18
                .addDate("date", new Date())
19
                .addLong("time",System.currentTimeMillis()).toJobParameters();
20
        
21
        JobExecution execution = jobLauncher.run(job, jobParameters);
22
        System.out.println("STATUS :: "+execution.getStatus());
23
    }
24
}
25
 
          



application.properties

Properties files
 




xxxxxxxxxx
1
16


1
##
2
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
3
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
4
spring.kafka.consumer.group-id=customers-group
5
##
6
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
7
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
8
spring.kafka.producer.client-id=customers-client
9
##
10
spring.kafka.consumer.properties.spring.json.trusted.packages=*
11
##
12
spring.kafka.template.default-topic=customers4
13
 
          
14
 
          
15
# Execute all Spring Batch jobs in the context on startup. Make it false
16
spring.batch.job.enabled=false



Below screenshot shows Spring Batch job have been run successfully. STATUS=COMPLETE


Below commands views messages from the Kafka topic.


Reader Code - 

This example will read the Kafka Topic using  KafkaItemReader  and write the data to  >> Output Path = C:\Users\pc\AppData\Local\Temp\<anything>.out .

In this example: Customer.java, CustomerLineAggregator.java and CustomerRowMapper.java is exactly same as above example.

JobConfiguration.java

Java
 




xxxxxxxxxx
1
64


1
@Configuration
2
public class JobConfiguration {
3
    @Autowired
4
    private JobBuilderFactory jobBuilderFactory;
5
    
6
    @Autowired
7
    private StepBuilderFactory stepBuilderFactory;
8
    
9
    @Autowired
10
    private KafkaProperties properties;
11
    
12
    @Bean
13
    public KafkaItemReader<Long, Customer> kafkaItemReader() {
14
        Properties props = new Properties();
15
        props.putAll(this.properties.buildConsumerProperties());
16
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
17
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
18
 
          
19
        return new KafkaItemReaderBuilder<Long, Customer>()
20
            .partitions(0)
21
            .consumerProperties(props)
22
            .name("customers-reader")
23
            .saveState(true)
24
            .topic("customers")
25
            .build();
26
    }
27
    
28
    
29
    @Bean
30
    public FlatFileItemWriter<Customer> customerItemWriter() throws Exception{
31
        String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
32
        System.out.println(">> Output Path = "+customerOutputPath);
33
        
34
        FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
35
        //A LineAggregator implementation that simply calls Object.toString() on the given object
36
        //itemWriter.setLineAggregator(new PassThroughLineAggregator<>());
37
        
38
        //Alternate ways
39
        itemWriter.setLineAggregator(new CustomerLineAggregator());
40
        
41
        itemWriter.setResource(new FileSystemResource(customerOutputPath));
42
        itemWriter.afterPropertiesSet();
43
        
44
        return itemWriter;
45
    }
46
    
47
    
48
    @Bean
49
    public Step step1() throws Exception {
50
        return stepBuilderFactory.get("step1")
51
                .<Customer, Customer>chunk(100)
52
                .reader(kafkaItemReader())
53
                .writer(customerItemWriter())
54
                .build();
55
    }
56
    
57
    @Bean
58
    public Job job() throws Exception {
59
        return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer())
60
                .start(step1())
61
                .build();
62
    }
63
}
64
 
          



application.properties

Properties files
 




xxxxxxxxxx
1
16


1
##
2
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
3
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
4
spring.kafka.consumer.group-id=customers-group
5
##
6
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
7
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
8
spring.kafka.producer.client-id=customers-client
9
##
10
spring.kafka.consumer.properties.spring.json.trusted.packages=*
11
##
12
spring.kafka.template.default-topic=customers4
13
 
          
14
spring.batch.job.enabled=false
15
 
          
16
server.port=8888



SpringBatchKafkaReaderApplication.java

Java
 




xxxxxxxxxx
1
26


 
1
@SpringBootApplication
2
@EnableBatchProcessing
3
public class SpringBatchKafkaReaderApplication implements CommandLineRunner{
4
    @Autowired
5
    private JobLauncher jobLauncher;
6
 
          
7
    @Autowired
8
    private Job job;
9
    
10
    public static void main(String[] args) {
11
        SpringApplication.run(SpringBatchKafkaReaderApplication.class, args);
12
    }
13
 
          
14
    @Override
15
    public void run(String... args) throws Exception {
16
        JobParameters jobParameters = new JobParametersBuilder()
17
                .addString("JobId", UUID.randomUUID().toString())
18
                .addDate("date", new Date())
19
                .addLong("time",System.currentTimeMillis()).toJobParameters();
20
        
21
        JobExecution execution = jobLauncher.run(job, jobParameters);
22
        System.out.println("STATUS :: "+execution.getStatus());
23
    }
24
 
          
25
}
26
 
          



File is created here - >> Output Path =  C:\Users\pc\AppData\Local\Temp\customerOutput5126361010566575708.out 

Java
 




xxxxxxxxxx
1
20


 
1
1, John, Doe,10-10-1952 10:10:10
2
2, Amy, Eugene,05-07-1985 17:10:00
3
3, Laverne, Mann,11-12-1988 10:10:10
4
4, Janice, Preston,19-02-1960 10:10:10
5
5, Pauline, Rios,29-08-1977 10:10:10
6
6, Perry, Burnside,10-03-1981 10:10:10
7
7, Todd, Kinsey,14-12-1998 10:10:10
8
8, Jacqueline, Hyde,20-03-1983 10:10:10
9
9, Rico, Hale,10-10-2000 10:10:10
10
10, Samuel, Lamm,11-11-1999 10:10:10
11
11, Robert, Coster,10-10-1972 10:10:10
12
12, Tamara, Soler,02-01-1978 10:10:10
13
13, Justin, Kramer,19-11-1951 10:10:10
14
14, Andrea, Law,14-10-1959 10:10:10
15
15, Laura, Porter,12-12-2010 10:10:10
16
16, Michael, Cantu,11-04-1999 10:10:10
17
17, Andrew, Thomas,04-05-1967 10:10:10
18
18, Jose, Hannah,16-09-1950 10:10:10
19
19, Valerie, Hilbert,13-06-1966 10:10:10
20
20, Patrick, Durham,12-10-1978 10:10:10


Topics:
apache kafka ,apache kafka writer ,java ,spring batch ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}