How to Marry MDC With Spring Integration
This article explores the challenges of Mapped Diagnostic Context propagation in Spring integration to ensure the correct context persists across workflows.
Join the DZone community and get the full member experience.
Join For FreeIn modern enterprise applications, effective logging and traceability are critical for debugging and monitoring business processes. Mapped Diagnostic Context (MDC) provides a mechanism to enrich logging statements with contextual information, making it easier to trace requests across different components.
This article explores the challenges of MDC propagation in Spring integration and presents strategies to ensure that the diagnostic context remains intact as messages traverse these channels.
Let's start with a very brief overview of both technologies. If you are already familiar with them, you can go straight to the 'Marry Spring Integration with MDC' section.
Mapped Diagnostic Context
Mapped Diagnostic Context (MDC) plays a crucial role in logging by providing a way to enrich log statements with contextual information specific to a request, transaction, or process. This enhances traceability, making it easier to correlate logs across different components in a distributed system.
{
MDC.put("SOMEID", "xxxx");
runSomeProcess();
MDC.clear();
}
All the logging calls invoked inside runSomeProcess will have "SOMEID" in the context and could be added to log messages with the appropriate pattern in the logger configuration. I will use log4j2, but SL4J also supports MDC.
pattern="%d{HH:mm:ss} %-5p [%X{SOMEID}] [%X{TRC_ID}] - %m%n"
The %X placeholder in log4j2 outputs MDC values (in this case - SOMEID and TRC_ID).
Output:
18:09:19 DEBUG [SOMEIDVALUE] [] SomClass:XX - log message text
Here we can see that TRC_ID was substituted with an empty string as it was not set in the MDC context (so it does not affect operations, running out of context).
And logs, that are a terrible mess of threads:
19:54:03 49 DEBUG Service1:17 - process1. src length: 2
19:54:04 52 DEBUG Service2:22 - result: [77, 81, 61, 61]
19:54:04 52 DEBUG DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#4'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=MQ==, headers={SOMEID=30, id=abbff9b1-1273-9fc8-127d-ca78ffaae07a, timestamp=1747500844111}]
19:54:04 52 INFO IntegrationConfiguration:81 - Result: MQ==
19:54:04 52 DEBUG DirectChannel:191 - postSend (sent=true) on channel 'bean 'demoWorkflow.channel#4'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=MQ==, headers={SOMEID=30, id=abbff9b1-1273-9fc8-127d-ca78ffaae07a, timestamp=1747500844111}]
19:54:04 52 DEBUG QueueChannel:191 - postReceive on channel 'bean 'queueChannel-Q'; defined in: 'class path resource [com/fbytes/mdcspringintegration/integration/IntegrationConfiguration.class]'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.queueChannelQ()'', message: GenericMessage [payload=1, headers={SOMEID=31, id=d0b6c58d-457e-876c-a240-c36d36f7e4f5, timestamp=1747500838034}]
19:54:04 52 DEBUG PollingConsumer:313 - Poll resulted in Message: GenericMessage [payload=1, headers={SOMEID=31, id=d0b6c58d-457e-876c-a240-c36d36f7e4f5, timestamp=1747500838034}]
19:54:04 52 DEBUG ServiceActivatingHandler:313 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1907874b] (demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#4) received message: GenericMessage [payload=1, headers={SOMEID=31, id=d0b6c58d-457e-876c-a240-c36d36f7e4f5, timestamp=1747500838034}]
19:54:04 52 DEBUG Service2:16 - encoding 1
19:54:04 49 DEBUG Service1:24 - words processed: 1
19:54:04 49 DEBUG QueueChannel:191 - preSend on channel 'bean 'queueChannel-Q'; defined in: 'class path resource [com/fbytes/mdcspringintegration/integration/IntegrationConfiguration.class]'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.queueChannelQ()'', message: GenericMessage [payload=1, headers={id=6a67a5b4-724b-6f54-4e9f-acdeb2a7a235, timestamp=1747500844114}]
19:54:04 49 DEBUG QueueChannel:191 - postSend (sent=true) on channel 'bean 'queueChannel-Q'; defined in: 'class path resource [com/fbytes/mdcspringintegration/integration/IntegrationConfiguration.class]'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.queueChannelQ()'', message: GenericMessage [payload=1, headers={SOMEID=37, id=07cf749d-741e-640c-eb4f-f9bcd293dbcd, timestamp=1747500844114}]
19:54:04 49 DEBUG DirectChannel:191 - postSend (sent=true) on channel 'bean 'demoWorkflow.channel#3'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=gd, headers={id=e7aedd50-8075-fa2a-9dd3-c11956e0d296, timestamp=1747500843637}]
19:54:04 49 DEBUG DirectChannel:191 - postSend (sent=true) on channel 'bean 'demoWorkflow.channel#2'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=gd, headers={id=e7aedd50-8075-fa2a-9dd3-c11956e0d296, timestamp=1747500843637}]
19:54:04 49 DEBUG DirectChannel:191 - postSend (sent=true) on channel 'bean 'demoWorkflow.channel#1'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=(37,gd), headers={id=3048a04c-ff44-e2ce-98a4-c4a84daa0656, timestamp=1747500843636}]
19:54:04 49 DEBUG DirectChannel:191 - postSend (sent=true) on channel 'bean 'demoWorkflow.channel#0'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=(37,gd), headers={id=d76dff34-3de5-e830-1f6b-48b337e0c658, timestamp=1747500843636}]
19:54:04 49 DEBUG SourcePollingChannelAdapter:313 - Poll resulted in Message: GenericMessage [payload=(38,g), headers={id=495fe122-df04-2d57-dde2-7fc045e8998f, timestamp=1747500844114}]
19:54:04 49 DEBUG DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#0'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=(38,g), headers={id=495fe122-df04-2d57-dde2-7fc045e8998f, timestamp=1747500844114}]
19:54:04 49 DEBUG ServiceActivatingHandler:313 - ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@7efd28bd] (demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0) received message: GenericMessage [payload=(38,g), headers={id=495fe122-df04-2d57-dde2-7fc045e8998f, timestamp=1747500844114}]
19:54:04 49 DEBUG DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#1'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=(38,g), headers={id=1790d3d8-9501-f479-c5ee-6b9232295313, timestamp=1747500844114}]
19:54:04 49 DEBUG MessageTransformingHandler:313 - bean 'demoWorkflow.transformer#0' for component 'demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()' received message: GenericMessage [payload=(38,g), headers={id=1790d3d8-9501-f479-c5ee-6b9232295313, timestamp=1747500844114}]
19:54:04 49 DEBUG DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#2'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=g, headers={id=e2f69d41-f760-2f4d-87c2-4e990beefdaa, timestamp=1747500844114}]
19:54:04 49 DEBUG MessageFilter:313 - bean 'demoWorkflow.filter#0' for component 'demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()' received message: GenericMessage [payload=g, headers={id=e2f69d41-f760-2f4d-87c2-4e990beefdaa, timestamp=1747500844114}]
19:54:04 49 DEBUG DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#3'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=g, headers={id=e2f69d41-f760-2f4d-87c2-4e990beefdaa, timestamp=1747500844114}]
19:54:04 49 DEBUG ServiceActivatingHandler:313 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@1e469dfd] (demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#3) received message: GenericMessage [payload=g, headers={id=e2f69d41-f760-2f4d-87c2-4e990beefdaa, timestamp=1747500844114}]
19:54:04 49 DEBUG Service1:17 - process1. src length: 1
19:54:04 49 DEBUG Service1:24 - words processed: 1
It will become readable, and even the internal Spring Integration messages are attached to specific SOMEID processing.
19:59:44 49 DEBUG [19] [] Service1:17 - process1. src length: 3
19:59:45 52 DEBUG [6] [] Service2:22 - result: [77, 119, 61, 61]
19:59:45 52 DEBUG [6] [] DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#4'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=Mw==, headers={SOMEID=6, id=b19eb8b6-7c5b-aa5a-31d0-dc9b940e4cd9, timestamp=1747501185064}]
19:59:45 52 INFO [6] [] IntegrationConfiguration:81 - Result: Mw==
19:59:45 52 DEBUG [6] [] DirectChannel:191 - postSend (sent=true) on channel 'bean 'demoWorkflow.channel#4'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=Mw==, headers={SOMEID=6, id=b19eb8b6-7c5b-aa5a-31d0-dc9b940e4cd9, timestamp=1747501185064}]
19:59:45 52 DEBUG [6] [] QueueChannel:191 - postReceive on channel 'bean 'queueChannel-Q'; defined in: 'class path resource [com/fbytes/mdcspringintegration/integration/IntegrationConfiguration.class]'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.queueChannelQ()'', message: GenericMessage [payload=2, headers={SOMEID=7, id=5e4f9113-6520-c20c-afc8-f8e1520bf9e9, timestamp=1747501177082}]
19:59:45 52 DEBUG [7] [] PollingConsumer:313 - Poll resulted in Message: GenericMessage [payload=2, headers={SOMEID=7, id=5e4f9113-6520-c20c-afc8-f8e1520bf9e9, timestamp=1747501177082}]
19:59:45 52 DEBUG [7] [] ServiceActivatingHandler:313 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5d21202d] (demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#4) received message: GenericMessage [payload=2, headers={SOMEID=7, id=5e4f9113-6520-c20c-afc8-f8e1520bf9e9, timestamp=1747501177082}]
19:59:45 52 DEBUG [7] [] Service2:16 - encoding 2
19:59:45 53 DEBUG [] [] QueueChannel:191 - postReceive on channel 'bean 'queueChannel-Q'; defined in: 'class path resource [com/fbytes/mdcspringintegration/integration/IntegrationConfiguration.class]'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.queueChannelQ()'', message: GenericMessage [payload=2, headers={SOMEID=8, id=37400675-0f79-8a89-de36-dacf2feb106e, timestamp=1747501177343}]
19:59:45 53 DEBUG [8] [] PollingConsumer:313 - Poll resulted in Message: GenericMessage [payload=2, headers={SOMEID=8, id=37400675-0f79-8a89-de36-dacf2feb106e, timestamp=1747501177343}]
19:59:45 53 DEBUG [8] [] ServiceActivatingHandler:313 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5d21202d] (demoWorkflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#4) received message: GenericMessage [payload=2, headers={SOMEID=8, id=37400675-0f79-8a89-de36-dacf2feb106e, timestamp=1747501177343}]
19:59:45 53 DEBUG [8] [] Service2:16 - encoding 2
19:59:45 52 DEBUG [7] [] Service2:22 - result: [77, 103, 61, 61]
19:59:45 52 DEBUG [7] [] DirectChannel:191 - preSend on channel 'bean 'demoWorkflow.channel#4'; from source: 'com.fbytes.mdcspringintegration.integration.IntegrationConfiguration.demoWorkflow()'', message: GenericMessage [payload=Mg==, headers={SOMEID=7, id=bbb9f71f-37d8-8bc4-90c3-bfb813430e4a, timestamp=1747501185469}]
19:59:45 52 INFO [7] [] IntegrationConfiguration:81 - Result: Mg==
Under the hood, MDC uses ThreadLocal storage, tying the context to the current thread. This works seamlessly in single-threaded flows but requires special handling in multi-threaded scenarios, such as Spring Integration’s queue channels.
Spring Integration
A great part of Spring, allowing a new level of services decoupling by building the application workflow where data is passed between services as a message, defining what method of the service to invoke for data processing, rather than making direct service-to-service calls.
IntegrationFlow flow = new IntegrationFlow.from("sourceChannel")
.handle("service1", "runSomeProcess")
.filter(....)
.transform(...)
.split()
.channel("serviceInterconnect")
.handle("service2", "runSomeProcess")
.get();
Here we:
- Get data from "sourceChannel" (assuming a bean with such a name already registered);
- Invoke service1.runSomeProcess passing the data (unwrapped from Message<?> of Spring Integration)
- Returned result (whatever it is) is wrapped back in Message, undergoes some filtering and transformations;
- Result (assuming it is some array or Stream), split for per-entry processing;
- Entries (wrapped in Message) passed to "serviceInterconnect" channel;
- Entries processed by service2.runSomeProcess;
Spring integration provides message channels of several types. What is important here is that some of them run the consumer process on a produced thread, while others (e.g., the Queue channel) delegate the processing to other consumer threads. The thread-local MDC context will be lost.
So, we need to find a way to propagate it down the workflow.
Marry Spring Integration With MDC
While micrometer-tracing propagates MDC between microservices, it doesn’t handle Spring integration’s queue channels, where thread switches occur. To maintain the MDC context, it must be stored in message headers on the producer side and restored on the consumer side. Below are three methods to achieve this:
- Use Spring Integration Advice;
- Use Spring-AOP @Aspect;
- Use Spring Integration ChannelInterceptor.
1. Using Spring Integration Advice
@Service
class MdcAdvice implements MethodInterceptor {
@Autowired
IMDCService mdcService;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message<?> message = (Message<?>) invocation.getArguments()[0];
Map<String, String> mdcMap = (Map<String, String>) message.getHeaders().entrySet().stream()
.filter(...)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())));
MDCService.set(mdcMap);
try {
return invocation.proceed();
} finally {
MDCService.clear(mdcMap);
}
}
}
It should be directly specified for the handler in the workflow, e.g.:
.handle("service1", "runSomeProcess", epConfig -> epConfig.advice(mdcAdvice))
Disadvantages
- It covers only the handler. Context cleared right after it, and thus, logging of the processes between handlers will have no context.
- It should be manually added to all handlers.
2. Using Spring-AOP @Aspect
@Aspect
@Component
public class MdcAspect {
@Autowired
IMDCService mdcService;
@Around("execution(* org.springframework.messaging.MessageHandler.handleMessage(..))")
public Object aroundHandleMessage(ProceedingJoinPoint joinPoint) throws Throwable {
Message<?> message = (Message<?>) joinPoint.getArgs()[0];
Map<String, String> mdcMap = (Map<String, String>) message.getHeaders().entrySet().stream()
.filter(...)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> (String) entry.getValue()));
mdcService.setContextMap(mdcMap);
try {
return joinPoint.proceed();
} finally {
mdcService.clear(mdcMap);
}
}
}
Disadvantages
- It should automatically be invoked, but.. only for "stand-alone" MesssageHandlers. For those defined inline, e.g., it won't work, because the handler is not a proxied bean in this case.
.handle((msg,headers) -> { return service1.runSomeProcess(); }
- It covers only the handlers, too.
3. Using Spring Integration ChannelInterceptor
First, we need to clear the context at the end of the processing. It can be done by defining the custom TaskDecorator:
@Service
public class MdcClearingTaskDecorator implements TaskDecorator {
private static final Logger logger = LogManager.getLogger(MdcClearingTaskDecorator.class);
private final MDCService mdcService;
public MdcClearingTaskDecorator(MDCService mdcService) {
this.mdcService = mdcService;
}
@Override
public Runnable decorate(Runnable runnable) {
return () -> {
try {
runnable.run();
} finally {
logger.debug("Cleaning the MDC context");
mdcService.clearMDC();
}
};
}
}
And set it for all TaskExecutors:
@Bean(name = "someTaskExecutor")
public TaskExecutor someTaskExecutor() {
ThreadPoolTaskExecutor executor = newThreadPoolExecutor(mdcService);
executor.setTaskDecorator(mdcClearingTaskDecorator);
executor.initialize();
return executor;
}
Used by pollers:
@Bean(name = "somePoller")
public PollerMetadata somePoller() {
return Pollers.fixedDelay(Duration.ofSeconds(30))
.taskExecutor(someTaskExecutor())
.getObject();
}
Inline:
.from(consoleMessageSource,
c -> c.poller(p -> p.fixedDelay(1000).taskExecutor(someTaskExecutor())))
Now, we need to save and restore the context as it passes the Pollable channels.
@Service
@GlobalChannelInterceptor(patterns = {"*-Q"})
public class MdcChannelInterceptor implements ChannelInterceptor {
private static final Logger logger = LogManager.getLogger(MdcChannelInterceptor.class);
@Value("${mdcspringintegration.mdc_header}")
private String mdcHeader;
@Autowired
private MDCService mdcService;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (!message.getHeaders().containsKey(mdcHeader)) {
return MessageBuilder.fromMessage(message)
.setHeader(mdcHeader, mdcService.fetch(mdcHeader)) // Add a new header
.build();
}
if (channel instanceof PollableChannel) {
logger.trace("Cleaning the MDC context for PollableChannel");
mdcService.clearMDC(); // clear MDC in producer's thread
}
return message;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
if (channel instanceof PollableChannel) {
logger.trace("Setting MDC context for PollableChannel");
Map<String, String> mdcMap = message.getHeaders().entrySet().stream()
.filter(entry -> entry.getKey().equals(mdcHeader))
.collect(Collectors.toMap(Map.Entry::getKey, entry -> (String) entry.getValue()));
mdcService.setMDC(mdcMap);
}
return message;
}
}
- preSend is invoked on the producer thread before the message is added to the Queue and cleans the context (of the producer's thread)
- postReceive is invoked on the consumer thread before the message is processed by the consumer.
This approach covers not only the handlers, but also the workflow (interrupting on queues only).
@GlobalChannelInterceptor(patterns = {"*-Q"})
– automatically attaches the interceptor to all channels that match the pattern(s).
A few words about the cleaning section of preSend. On first sight, it could look unnecessary, but let's see the thread's path when it encounters the Split.
The thread iterates the item and thus keeps the context after sending the doc to the queue. Red arrows are showing places where the context will be leaked from doc1 processing to doc2 processing and from doc2 to doc3.
That's it. We get an MDC context end-to-end in the Spring integration workflow. Do you know a better way? Please share in the comments.
Example Code
Opinions expressed by DZone contributors are their own.
Comments