Request Tracing in Spring Cloud Stream Data Pipelines With Kafka Binder
This article explains how to implement request tracing in spring cloud steam data pipelines with Kafka Binder.
Join the DZone community and get the full member experience.
Join For FreeWhat Is Data Pipeline?
A data pipeline is a process to transfer data from a single source/multiple sources to a destination via batch or stream process. In this process, raw data is filtered, cleaned, transformed, enriched, and feed to data lakes or data warehouses. In an enterprise, data is scattered to multiple systems in different formats; in those cases, data pipelines help to collect data from all the sources in the same format so that it’s appropriate for business intelligence and analytics.
What Is Request Tracing?
In a distributed system, a request travels multiple services before completion. Services can be hosted in different network zones, different VMs, different cloud providers, or any combination of these. Triaging an issue in this environment is tedious and time-consuming to eliminate; request tracing is helpful. A unique id is minted at the origin of the request, and it will be carried forward to all the systems request travel; with this approach, using a unique id, we can trace the journey of the request.
What Is Spring Cloud Stream?
As per the Spring document, Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with a shared messaging system. Spring Cloud Stream is built on Spring Boot and Spring Integration and supports multiple messaging frameworks from various providers.
Implementation Details
To implement request tracing in Spring Cloud Stream, I used preSend and afterSendCompletion methods in the ChannelInterceptor interface.
preSend is invoked before sending a message to the spring cloud stream channel. In override implementation of the preSend method, check if requestId exists in the Kafka headers and carry forward to output Kafka messages. If requestId doesn’t exist in the header, it injects the requestId.
afterSendCompletion is after the completion of a send. In override, implementation of the afterSendCompletion method clears the requestId in the threadLocal, so that the next Kafka message has its own requestId.
Below is the sample code:
@Configuration
@Slf4j
public class CloudStreamInterceptor implements ChannelInterceptor {
static final String type = "type";
protected RequestTracingContext requestTracingContext;
public CloudStreamInterceptor(RequestTracingContext requestTracingContext) {
this.requestTracingContext = requestTracingContext;
}
/**
* This overrided method mints the requestId in the Kafka Header before sending this to channel for processing
* @param message
* @param messageChannel
* @return
*/
@Override
public Message<Object> preSend(Message<? extends Object> message, MessageChannel messageChannel) {
try {
if (messageChannel instanceof DirectWithAttributesChannel) {
DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
String requestId = (String) message.getHeaders().get("requestId");
Object channelType = channel.getAttribute(type);
if (Sink.INPUT.equals(channelType)) {
preSendInput(message, requestId);
} else if (Source.OUTPUT.equals(channelType)) {
preSendOutput(message, requestId);
}
}
} catch (Exception ex) {
log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, ex);
}
return (Message<Object>) message;
}
/**
*
* @param message
* @param requestId
* @return
*/
private Message<Object> preSendInput(Message<? extends Object> message, String requestId) {
if (StringUtils.isBlank(requestId)) {
requestId = UUID.randomUUID().toString();
message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();
}
requestTracingContext.setRequestId(requestId);
return (Message<Object>) message;
}
/**
*
* @param message
* @param requestId
* @return
*/
private Message<Object> preSendOutput(Message<? extends Object> message, String requestId) {
if (StringUtils.isBlank(requestId) && StringUtils.isNotBlank(requestTracingContext.getRequestId())) {
requestId = UUID.randomUUID().toString();
message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();
}
requestTracingContext.setRequestId(requestId);
return (Message<Object>) message;
}
/**
* This method clears the requestId in theardLocal after send completion
* @param message
* @param messageChannel
* @param sent
* @param ex
*/
@Override
public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean sent, @Nullable Exception ex) {
try {
if (messageChannel instanceof DirectWithAttributesChannel) {
DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
Object channelType = channel.getAttribute(type);
if (Sink.INPUT.equals(channelType)) {
requestTracingContext.clear();
}
}
} catch (Exception e) {
log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, e);
}
}
}
RequestTracingContext class helps in setting, retrieving, and clearing the requestId to the threadlocal.
Below is the sample code:
package com.example.demo.configuration;
import com.example.demo.requesttracing.RequestTracingContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
@Configuration
@Slf4j
public class CloudStreamInterceptor implements ChannelInterceptor {
static final String type = "type";
protected RequestTracingContext requestTracingContext;
public CloudStreamInterceptor(RequestTracingContext requestTracingContext) {
this.requestTracingContext = requestTracingContext;
}
/**
* This overrided method mints the requestId in the Kafka Header before sending this to channel for processing
* @param message
* @param messageChannel
* @return
*/
@Override
public Message<Object> preSend(Message<? extends Object> message, MessageChannel messageChannel) {
try {
if (messageChannel instanceof DirectWithAttributesChannel) {
DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
String requestId = (String) message.getHeaders().get("requestId");
Object channelType = channel.getAttribute(type);
if (Sink.INPUT.equals(channelType)) {
preSendInput(message, requestId);
} else if (Source.OUTPUT.equals(channelType)) {
preSendOutput(message, requestId);
}
}
} catch (Exception ex) {
log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, ex);
}
return (Message<Object>) message;
}
/**
*
* @param message
* @param requestId
* @return
*/
private Message<Object> preSendInput(Message<? extends Object> message, String requestId) {
if (StringUtils.isBlank(requestId)) {
requestId = UUID.randomUUID().toString();
message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();
}
requestTracingContext.setRequestId(requestId);
return (Message<Object>) message;
}
/**
*
* @param message
* @param requestId
* @return
*/
private Message<Object> preSendOutput(Message<? extends Object> message, String requestId) {
if (StringUtils.isBlank(requestId) && StringUtils.isNotBlank(requestTracingContext.getRequestId())) {
requestId = UUID.randomUUID().toString();
message = MessageBuilder.fromMessage(message).setHeader("requestId", requestId).build();
}
requestTracingContext.setRequestId(requestId);
return (Message<Object>) message;
}
/**
* This method clears the requestId in theardLocal after send completion
* @param message
* @param messageChannel
* @param sent
* @param ex
*/
@Override
public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean sent, @Nullable Exception ex) {
try {
if (messageChannel instanceof DirectWithAttributesChannel) {
DirectWithAttributesChannel channel = (DirectWithAttributesChannel) messageChannel;
Object channelType = channel.getAttribute(type);
if (Sink.INPUT.equals(channelType)) {
requestTracingContext.clear();
}
}
} catch (Exception e) {
log.error("Exception while minting requestId. Details: Header {}, Message {}, Exception {}", message.getHeaders(), message, e);
}
}
}
Conclusion
Request tracking is helpful when dealing with asynchronous processing and applications having heavy traffic.
Opinions expressed by DZone contributors are their own.
Comments