DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Reactive Kafka With Spring Boot
  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets
  • Evolving Spring Boot APIs to an Event-Driven Mesh
  • Building Fault-Tolerant Kafka Consumers in Spring Boot Using Retry, DLQ, and Idempotent Code Patterns

Trending

  • A System Cannot Protect What It Does Not Understand
  • Getting Started With Agentic Workflows in Java and Quarkus
  • Architecting Zero-Trust AI Agents: How to Handle Data Safely
  • Why Round-Robin Won't Save You: Load Balancing Challenges in Data Streaming Services With Heterogeneous Traffic
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot

Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot

Learn how Kotlin Coroutines improve Spring Boot Kafka batch processing with parallel execution, resource throttling, and faster database operations.

By 
Erkin Karanlık user avatar
Erkin Karanlık
·
Jun. 16, 26 · Analysis
Likes (0)
Comment
Save
Tweet
Share
120 Views

Join the DZone community and get the full member experience.

Join For Free

Managing high-volume message traffic in distributed architectures is crucial. Efficient use of database and CPU resources is also very important. There are structures that allow us to receive messages in batches. The default Spring Kafka "BatchMessageListener" structure addresses this need. However, the processing of these messages often goes through a sequential bottleneck.

This article will discuss the structure and usage of Kotlin Coroutines in detail. We will examine how to maximize Kafka message processing performance using Structured Concurrency principles and Resource Throttling techniques.

Architectural Bottleneck: Sequential I/O Blocking

On the current Kafka listener:

Database or external service calls made for each message directly increase total processing times. If the processing speed of a message lags behind the message arrival speed and the max-poll-interval-ms time is exceeded, the consumer is removed from the consumer group.

Rebalancing is triggered, and the partitions of that consumer are redistributed to other consumers in the group.

Kotlin
 

@KafkaListener(topics = ["usage-pool-topic"])
fun usagePoolListener(records: List<ConsumerRecord<String, String>>) {
records.forEach { record ->
processRecord(record) // Network

latency + DB I/O blocking
}
}


Solution

1. Batch-Fetch and In-Memory Map Structure

Before any concurrent code is entered, data is retrieved collectively from all necessary entities. Multiple separate queries are converted into a batch query before data processing begins. The N+1 query problem is solved at the application layer. All data is cached once before being broken down into concurrent operations. Having the data cached significantly reduces our reliance on the database. Using the associateBy function, we transform the data into a map structure with X access times. This allows us to read the data safely from the maps instead of reading each concurrent operation from the database.

Kotlin
 
val messages = records.map { objectMapper.readValue(it.value(), UsagePoolRecord::class.java) }

val usagePoolEntities = usagePoolRepository
.findByIds(messages.map { it.usagePoolId.toBigInteger() })
.associateBy { it.usagePoolId }

val lockEntities = lockRepository
.findByUserIds(messages.map { it.userId })
.associateBy { it.userId }


2. Structured Concurrency

Memory Management With Chunking

The chunk structure serves two purposes. 

  1. It prevents the creation of coroutines simultaneously. This prevents unnecessary memory usage.
  2.  Each chunk writes to the database after all coroutines have completed their operations. Unnecessary connection pool consumption is avoided.
Kotlin
 

messages.chunked(150).forEach { chunk ->
// Each chunk of 150 records is processed concurrently
}


Resource Isolation With limitedParallelism

Why limitedParallelism? If the database connection pool has, for example, X connections, keeping the parallelism limit below X prevents "Connection Timeout" errors.

Kotlin
 
messages.chunked(150).forEach { chunk ->
val deferredResults = chunk.map { record ->
CoroutineScope(Dispatchers.IO.limitedParallelism(15)).async {
try {
processRecord(record, usagePoolEntities, lockEntities)
} catch (e: Exception) {
log.error("Operation error: ${record.key()}", e)
buildErrorRecord(record, e)
}
}
}
val results = deferredResults.awaitAll() // Structural waiting
collectAndAggregate(results)
}


  • The Dispatchers.IO.limitedParallelism(X) command limits the number of concurrent coroutines to X, preventing the DB connection pool from being exhausted.
  • Each coroutine returns a result with the async command.
  •  The awaitAll() command waits for all coroutines in the chunk to finish before proceeding to the next step.

runBlocking

This function blocks callers until all concurrent operations are complete. This is the correct approach here because:

  1. It ensures that the Kafka consumer remains blocked to maintain its offset commit structure until all records in the batch are processed.
  2.  We still benefit from concurrent operation parallelism within the runBlocking block.

3. Thread-Safe Result Structure

After the awaitAll() operation, all results are collected in thread-safe queues. Then a single batch write operation takes place. Using MutableList structures to combine results returned from parallel processed coroutines can lead to data loss.

At this point, lock-free data structures should be preferred. ConcurrentLinkedQueue uses CAS (Compare-And-Swap) algorithms instead of synchronized blocks. This provides superior performance in high-content write operations.

Why Shouldn't We Use ConcurrentLinkedQueue? 

Concurrent operations (concurrent functions) perform simultaneous write operations to a shared collection of results. Using MutableList leads to race conditions.  It performs well in secure and concurrent write operations.

Kotlin
 

data class AggregatedRecords(
val processedSave: ConcurrentLinkedQueue<ProcessedEntity> = ConcurrentLinkedQueue(),
val toDelete: ConcurrentLinkedQueue<UsagePoolEntity> = ConcurrentLinkedQueue(),
val retryQueue: ConcurrentLinkedQueue<RetryEntity> = ConcurrentLinkedQueue()
)


The DataIntegrityViolationException return is important. When two consumer instances are processing the same record, one of them falls into a unique constraint violation. Instead of making the entire batch fail, record-by-record deletion is performed.

Kotlin
 
AggregatedRecords.processedSave
.chunked(150)
.forEach { batch ->
try {
processedRepository.saveAll(batch)
} catch (e: DataIntegrityViolationException) {
batch.forEach { record ->
try { processedRepository.save(record) }
catch (e: DataIntegrityViolationException) {}
}
}
}


4. Error Tolerance in Write Operations

Batch write (saveAll) operations are performant. However, a "Unique Constraint" error in a single record can cause the entire batch to fail.

The following structure is critical to meet Optimistic Locking or Idempotency requirements.

Kotlin
 
aggregatedRecords.processedSave.chunked(150).forEach { batch ->
try {
processedRepository.saveAll(batch)
} catch (e: DataIntegrityViolationException) {
// Fallback: Try one by one if batch fails
batch.forEach { record ->
try {
processedRepository.save(record)
} catch (innerException: DataIntegrityViolationException) {
log.warn("Duplicate record skipped: ${record.id}")
}
}
}
}


5. Data Flow Diagram

  • Ingress: The Kafka batch is caught with runBlocking.
  • Preparation: All necessary context data is retrieved bulk from the DB.
  • Execution: Coroutines are started asynchronously in chunks.
  • Synchronization: The completion of all coroutines is awaited as a barrier point with awaitAll().
  • Egress: Collected results are made permanent with saveAll.

Data Flow Diagram

Performance Analysis and Results

Performance analysis and results

Conclusion

Processing Kafka messages in Spring Boot with Kotlin Coroutines not only increases speed but also improves code readability and makes resource management deterministic (predictable).

The use of runBlocking allows us to build a bridge between the blocking Kafka consumer thread and the suspended world without disrupting Kafka's offset management mechanism.

Dependencies

XML
 
<dependency> 
<groupId>org.jetbrains.kotlinx</groupId> 
<artifactId>kotlinx-coroutines-core</artifactId> 
<version>1.7.3</version>
</dependency>
<dependency> 
<groupId>org.springframework.kafka</groupId> 
<artifactId>spring-kafka</artifactId>
</dependency>


Batch processing Data processing kafka Kotlin (programming language) Spring Boot

Opinions expressed by DZone contributors are their own.

Related

  • Reactive Kafka With Spring Boot
  • How AI Is Rewriting Full-Stack Java Systems: Practical Patterns with Spring Boot, Kafka and WebSockets
  • Evolving Spring Boot APIs to an Event-Driven Mesh
  • Building Fault-Tolerant Kafka Consumers in Spring Boot Using Retry, DLQ, and Idempotent Code Patterns

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook