DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Integration

Integration refers to the process of combining software parts (or subsystems) into one system. An integration framework is a lightweight utility that provides libraries and standardized methods to coordinate messaging among different technologies. As software connects the world in increasingly more complex ways, integration makes it all possible facilitating app-to-app communication. Learn more about this necessity for modern software development by keeping a pulse on the industry topics such as integrated development environments, API best practices, service-oriented architecture, enterprise service buses, communication architectures, integration testing, and more.

icon
Latest Premium Content
Trend Report
Modern API Management
Modern API Management
Refcard #303
API Integration Patterns
API Integration Patterns
Refcard #249
GraphQL Essentials
GraphQL Essentials

DZone's Featured Integration Resources

Spring AI Advisors: Chat Memory, Token Tracking, and Message Logging

Spring AI Advisors: Chat Memory, Token Tracking, and Message Logging

By Horatiu Dan DZone Core CORE
Abstract The previous two articles in this series — Building a Spring AI Assistant with MCP Servers: A Step-by-Step Tutorial and Securing the AI Host and Spring AI MCP Server Communication with API Keys — laid the groundwork for moving from prototype to production when building business-driven Spring AI applications. In this last one, the tutorial is concluded. Why Advisors? When you build something with Spring AI's ChatClient, sooner or later you want behavior that crosses every request — keep conversation history so the next prompt has context, count tokens so you know what each call costs, log the raw request and response payloads when something goes wrong. Threading that logic through your service code, one method at a time, is exactly the kind of cross-cutting concern Aspect-Oriented Programming was invented for, and Spring AI's advisors are essentially that: AOP for the AI call path. This article walks through three advisors working together on a Spring AI chat client running on Java 25 with Spring AI 1.1.4: the built-in MessageChatMemoryAdvisor, plus two custom ones — a TokenUsageAdvisor that tracks token consumption and a MessageLoggerAdvisor that records the full request/response payloads. The example assumes the chat client is already wired up to one or more MCP servers exposing tools, but the advisor mechanism applies identically to a chat client with no tools at all. The Advisor Contract The central interface to implement is CallAdvisor and its default behavior is as follows: Java public interface CallAdvisor extends Advisor { @Override default ChatClientResponse adviseCall(ChatClientRequest chatClientRequest, CallAdvisorChain callAdvisorChain) { ChatClientRequest processedChatClientRequest = before(chatClientRequest, callAdvisorChain); ChatClientResponse chatClientResponse = callAdvisorChain.nextCall(processedChatClientRequest); return after(chatClientResponse, callAdvisorChain); } } There are three clear steps outlined: logic is executed before the rest of the advisor chain, the rest of the advisors are called, and logic is executed after. Depending on the advisor type, the before, the after, or both parts are addressed; nevertheless, the advisor chain is invoked, and the response is returned. One last consideration is regarding the advisors’ order of execution, given by the Ordered#getOrder() method. The ones with higher precedence (lower order value) are executed before the ones with lower precedence (higher order value) when the before() method processes the request and vice-versa when after() processes the response, the ones with lower before those with higher precedence. The picture below visually summarizes this detail. Just as in the previous two parts, to be able to follow along, switch to 3-main branch of the designated GitHub repository and address the existing TODOs and complete the implementation. Memory: Making the Conversation Stateful The simplest useful advisor ships with Spring AI. By default a ChatClient is stateless - each call sees only the system prompt and the current user message. MessageChatMemoryAdvisor fixes that by maintaining a windowed history and injecting it into each prompt. Configure the memory bean first — here, a sliding window of 50 messages: Java @Bean public ChatMemory chatMemory() { return MessageWindowChatMemory.builder() .maxMessages(50) .build(); } Then register the advisor on the ChatClient: Java chatClient = builder .defaultSystem("You are a helpful Telecom AI assistant. Provide short, meaningful answers.") .defaultAdvisors(MessageChatMemoryAdvisor.builder(chatMemory).build()) .build(); That's it. The advisor stores each USER and ASSISTANT message in the underlying ChatMemory and prepends the relevant slice to each outgoing prompt. To erase a conversation, clear the memory store: Java public void clearConversation() { chatMemory.clear(DEFAULT_CONVERSATION_ID); } A Token Usage Advisor Every call to an LLM has a cost — money, latency, or both. Knowing what each interaction consumes is something you want from day one, not something you bolt on after the bill. Spring AI provides ChatResponseMetadata#getUsage() with the actual numbers reported by the provider; we just need an advisor to read it, accumulate and (optionally) estimate the prompt-side cost before the call. TODO 1. Add a new advisor that tracks the token usage. Java public class TokenUsageAdvisor implements BaseAdvisor { private static final Logger log = LoggerFactory.getLogger(TokenUsageAdvisor.class); private final AtomicInteger promptTokenCount = new AtomicInteger(0); private final AtomicInteger completionTokenCount = new AtomicInteger(0); private final AtomicInteger totalTokenCount = new AtomicInteger(0); private final int order; private final TokenCountEstimator tokenCountEstimator; public TokenUsageAdvisor(int order) { this.order = order; tokenCountEstimator = new JTokkitTokenCountEstimator(); } @Override public ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) { List<Message> messages = chatClientRequest.prompt().getInstructions(); int tokenCount = messages.stream() .mapToInt(msg -> { var text = switch (msg) { case UserMessage userMsg -> userMsg.getText(); case AssistantMessage assistantMsg -> assistantMsg.getText(); case SystemMessage systemMsg -> systemMsg.getText(); default -> ""; }; return tokenCountEstimator.estimate(text); }) .sum(); log.debug("Request: {} messages ~ {} estimated tokens.", messages.size(), tokenCount); return chatClientRequest; } @Override public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) { Optional.ofNullable(chatClientResponse.chatResponse()) .map(ChatResponse::getMetadata) .map(ChatResponseMetadata::getUsage) .ifPresent(usage -> { int currentPrompt = usage.getPromptTokens(); int currentCompletion = usage.getCompletionTokens(); int currentTotal = usage.getTotalTokens(); log.info("Current tokens - \nPrompt: {} Completion: {} Total: {}", currentPrompt, currentCompletion, currentTotal); int accPrompt = promptTokenCount.addAndGet(currentPrompt); int accCompletion = completionTokenCount.addAndGet(currentCompletion); int accTotal = totalTokenCount.addAndGet(currentTotal); log.info("Accumulated tokens - \nPrompt: {} Completion: {} Total: {}", accPrompt, accCompletion, accTotal); }); return chatClientResponse; } @Override public int getOrder() { return order; } public int totalTokens() { return totalTokenCount.get(); } public void clearUsage() { promptTokenCount.set(0); completionTokenCount.set(0); totalTokenCount.set(0); } } In the before() stage, a JTokkitTokenCountEstimator instance is used to estimate how many tokens the user, assistant and system messages represent. During after(), the actual prompt, completion and total token consumption are accumulated for an objective view on this matter. The exposed totalTokens() and clearUsage() methods make it trivial to display the running total in a UI and reset it when the user clears the chat. A Message Logger Advisor When the LLM does something unexpected, you want the raw payloads. This advisor logs both halves of each exchange in JSON and, on the request side, also dumps the list of tools the model has been told about - useful when tool calls aren't happening for reasons that aren't obvious. TODO 2. A second additional advisor is added, one that logs the messages in a particular manner. The main ultimate goal is to have a few (3 in the case of this tutorial) and to observe how the execution chain is executed. Java public class MessageLoggerAdvisor implements BaseAdvisor { private static final Logger log = LoggerFactory.getLogger(MessageLoggerAdvisor.class); private final int order; public MessageLoggerAdvisor(int order) { this.order = order; } @Override public ChatClientRequest before(ChatClientRequest chatClientRequest, AdvisorChain advisorChain) { Prompt prompt = chatClientRequest.prompt(); Object tools = "N/A"; if (prompt.getOptions() instanceof ToolCallingChatOptions toolOptions) { tools = toolOptions.getToolCallbacks().stream() .map(callback -> callback.getToolDefinition().name()) .toList(); } log.info("Tools: {}", tools); String messages = prompt.getInstructions().stream() .map(ModelOptionsUtils::toJsonString) .collect(Collectors.joining("\n")); log.info("Request:\n{}", messages); return chatClientRequest; } @Override public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorChain advisorChain) { String messages = Optional.ofNullable(chatClientResponse.chatResponse()) .map(ChatResponse::getResults) .orElseGet(Collections::emptyList) .stream() .map(gen -> ModelOptionsUtils.toJsonString(gen.getOutput())) .collect(Collectors.joining("\n")); log.info("Response:\n{}", messages); return chatClientResponse; } @Override public int getOrder() { return order; } } Both before() and after() methods log the messages in JSON format and while additionally, before() displays the available tools, obviously exposed by the connected MCP servers. Both halves serialize messages with ModelOptionsUtils#toJsonString, which produces stable, parseable output. Production-bound code would want sampling, redaction, and an async log appender, but the structure stays the same. Wiring Them All Together TODO 3. As the custom advisors are ready, they can be used when the ChatClient is built, in the ChatAssistant constructor. Java public ChatAssistant(ChatClient.Builder builder, ToolCallbackProvider toolCallbackProvider, ChatMemory chatMemory) { this.chatMemory = chatMemory; tokenUsageAdvisor = new TokenUsageAdvisor(1); chatClient = builder .defaultSystem("You are a helpful Telecom AI assistant. Provide short, meaningful answers.") .defaultToolCallbacks(toolCallbackProvider) .defaultAdvisors(MessageChatMemoryAdvisor.builder(chatMemory).build(), tokenUsageAdvisor, new MessageLoggerAdvisor(2)) .build(); } With the chat memory advisor highest precedence, history is added to the prompt before the token advisor measures it and before the logger captures the actual outgoing messages — which is what you want, since otherwise you'd be counting and logging a prompt that doesn't reflect what the model actually sees. The token advisor exposes its accumulator so the surrounding service can surface it and clear it alongside the memory: Java public void clearConversation() { chatMemory.clear(DEFAULT_CONVERSATION_ID); tokenUsageAdvisor.clearUsage(); } public int totalTokens() { return tokenUsageAdvisor.totalTokens(); } TODO 4. The last two methods are called from by the controller as the user interacts with the telecom-assistant UI. Java @GetMapping("/") public String home(Model model) { model.addAttribute("messages", assistant.conversationMessages()); model.addAttribute("tokens", assistant.totalTokens()); return "chat"; } Last but not least, the total tokens consumption is added in the top bar of the chat.html. HTML <div class="text-secondary small" th:text="|Messages: ${#lists.size(messages)}, Tokens: ${tokens}|"></div> With all three applications up and running, let’s issue the following prompt — ‘What’s the vendor of the invoices having ‘vdf’ in their number?’ and then ‘Provide a short info for this vendor.’ The responses are to the point, as in the image below. Obviously, both MCP servers contributed, and the chat memory had an important role as well. One last aspect can be depicted from the logs. The snippet below was captured after the latter prompt was sent. Plain Text INFO c.h.t.controller.ChatController - USER: Provide a short info for this vendor. DEBUG c.h.t.advisor.TokenUsageAdvisor - Request: 4 messages ~ 41 estimated tokens. INFO c.h.t.advisor.MessageLoggerAdvisor - Tools: [get_vendor_information, get_paid_invoices_count, get_invoices_by_pattern_on_number, get_paid_invoices_total_amount] INFO c.h.t.advisor.MessageLoggerAdvisor - Request: {"messageType":"SYSTEM","metadata":{"messageType":"SYSTEM"},"text":"You are a helpful Telecom AI assistant. Provide short, meaningful answers."} {"messageType":"USER","metadata":{"messageType":"USER"},"media":[],"text":"What's the vendor of the invoices having 'vdf' in their number?"} {"messageType":"ASSISTANT","metadata":{"role":"ASSISTANT","messageType":"ASSISTANT","refusal":"","finishReason":"STOP","index":0,"annotations":[],"id":"chatcmpl-DVHi80AIaMxjiCaIFkr5hk4IUBGL5"},"toolCalls":[],"media":[],"text":"Vodafone."} {"messageType":"USER","metadata":{"messageType":"USER"},"media":[],"text":"Provide a short info for this vendor."} DEBUG i.m.client.LifecycleInitializer - Joining previous initialization DEBUG i.m.spec.McpClientSession - Sending message for method tools/call Four messages — the chat memory advisor has already injected the prior turns by the time the logger sees the prompt. That's exactly the contract: the message history is in the prompt before later advisors run. When the response comes back, the order reverses: Plain Text DEBUG i.m.spec.McpClientSession - Received response: JSONRPCResponse[jsonrpc=2.0, id=ab8da66b-2, result={content=[{type=text, text=Specializes in cloud services.}], isError=false}, error=null] DEBUG i.m.c.t.HttpClientStreamableHttpTransport - SendMessage finally: onComplete DEBUG i.m.c.t.HttpClientStreamableHttpTransport - SSE connection established successfully INFO c.h.t.advisor.MessageLoggerAdvisor - Response: {"messageType":"ASSISTANT","metadata":{"role":"ASSISTANT","messageType":"ASSISTANT","refusal":"","finishReason":"STOP","index":0,"annotations":[],"id":"chatcmpl-DVHj4OdhPIYHi6yrAYkhSp8uqFQMO"},"toolCalls":[],"media":[],"text":"Vodafone — specializes in cloud services."} INFO c.h.t.advisor.TokenUsageAdvisor - Current tokens - Prompt: 542 Completion: 298 Total: 840 INFO c.h.t.advisor.TokenUsageAdvisor - Accumulated tokens - Prompt: 1235 Completion: 659 Total: 1894 INFO c.h.t.controller.ChatController - ASSISTANT: Vodafone — specializes in cloud services. The logger gets first crack at the response, then the token advisor accumulates the usage. The UI of the conversation looks as follows, where the total tokens are displayed as well. If the user presses the Clear button, the tokens’ counter is reset and the chat memory for the current conversation erased (see the above clearConversation() method). If sending the prompt ‘Give me a few details about the vendor,’ obviously the LLM is unable to respond. To wrap up — three advisors, three concerns kept out of the business code. Memory turns a stateless chat into a stateful one. Token tracking exposes real and estimated cost. Message logging gives you a tape recorder for the times when the model surprises you. The interface is the same in all three cases — before, advance the chain, after — and Spring AI does the orchestration. The same pattern naturally extends: prompt rewriting, content moderation, retry-with-backoff, output validation. Anywhere you'd reach for an aspect in a regular Spring service, an advisor is the right shape on the AI side. Going to Production The POC developed so far is a good start for understanding the concepts behind such an integration, so that real production applications can be further created and deployed. To be able to think of such a scenario, several recommendations are worth taking into account. First, one shall be aware of at least a few production-wise ‘-ilities’ such as security, scalability, observability, and also consider the performance aspect. Secondly, when integrating ready-to-use MCP servers, one should not show blind trust so that no new supply chain risks are created. Security aspects were already discussed in this tutorial. The communication was secured with API keys, although the desired approach in a production environment is OAuth 2.0. Either way, Spring Security is very helpful when both web applications and MCP client-server communication need to be secured, and it’s a handful to integrate it into a product that’s already using Spring. Additionally, concerning the data stored in the databases and the conversations held, data encryption shall be leveraged, as most vendors offer it transparently. Regarding scalability, every time an LLM or an SQL database is called via an HTTP request, IO is blocked by the calling thread. Beginning with Java 21, virtual threads are available, and the scalability of IO-bound services is significantly improved. When it comes to observability, keeping an eye on the system resources is always recommended. Without any doubt, all requests to an LLM have a cost — either in money or in complexity. Spring Boot provides the actuator metrics endpoint out of the box — http://localhost:8080/actuator/metrics — which offers a great deal of insights, including those related to the token consumption. These metrics can then be forwarded via Micrometer to a time-series database to further monitor the systems via dashboards; hence, the visibility is significantly increased. As for the enhanced performance in general, GraalVM is a great option. Once the SDK is set up and available, applications can be turned into GraalVM native images, then transformed into Docker images, then run in the cloud (Kubernetes, CloudFoundry, etc.) or in a virtual machine emulating Linux, and great improvements are definitely observed. Final Thoughts AI is reshaping software development — stay informed and up-to-date, and adapt and use it. Regarding MCP, it acts as a universal adapter and allows AI assistants to securely access and interact with external systems while maintaining a consistent interface. With MCP, AI development is not fragmented anymore; LLM strengths are applied to real data, and great insights are outlined. This tutorial is a good starting point that showcases how one can start helping users benefit from a cohesive system where individual components integrate seamlessly to deliver meaningful results. Then, imagination can fly freely, as many ideas are now just a few “words” away from being put into practice by wisely using AI as a tool that magnifies our existing skills. In the end, I would like to conclude with my brief takeaways. No Python switch is needed; continue using Java and Spring, as they have already proven they are good candidates when building production-ready software. Moreover, Spring AI is production-ready as well if we design responsively. Last but not least, embrace the Embabel Agent Framework that allows implementing agentic flows on the JVM that seamlessly mix LLM-prompt interactions with code and sketches the path towards developing agents as part of an enterprise ecosystem. Resources [1] – The source code for the Spring AI Telecom Assistant [2] – asentinel-orm project [3] – MCP Inspector More
Advanced Error Handling and Retry Patterns in Enterprise REST Integrations

Advanced Error Handling and Retry Patterns in Enterprise REST Integrations

By Anil guntupalli
Enterprise REST integrations rarely fail in a clean, binary way. The dominant failure modes are usually partial and ambiguous: a socket closes after a downstream system commits, a gateway returns a timeout while the target service is still processing, a throttling layer asks for a pause, or a dependency becomes slow enough that waiting callers begin to exhaust threads, connections, and ports. In that environment, simplistic catch-and-retry logic is not resilience. It is uncontrolled traffic generation. Mature error handling starts by accepting that not every failure is retryable, that the HTTP protocol already exposes useful semantics for temporary overload and replay safety, and that retry logic has to cooperate with circuit breaking, fallback paths, and telemetry rather than act on its own. Failure Semantics Before Retry A robust retry policy begins with failure classification, not with a retry counter. Temporary transport failures, selected timeout conditions, and explicit server-side signals such as 503 Service Unavailable and 429 Too Many Requests are fundamentally different from validation, authorization, or contract violations. 503 is explicitly defined as a temporary inability to handle the request, potentially accompanied by Retry-After, while 429 represents rate limiting and may also carry a Retry-After value. By contrast, retrying an invalid request usually only repeats the same defect. Microsoft’s retry guidance makes the same distinction: transient faults are worth retrying after a delay, while non-transient faults should be surfaced and handled as errors. HTTP method semantics also matter more than most retry interceptors admit. RFC 9110 defines safe methods as read-only and idempotent methods as those whose intended effect is the same whether one request arrives or many. It explicitly permits automatic retries for idempotent methods after a communication failure, but advises against automatic retries for non-idempotent methods unless the client has another way to know the action is safe to replay or to prove that the original request was never applied. That is the reason payment capture, shipment reservation, and account mutation flows need business idempotency keys or conditional requests, not just a library annotation. For update-heavy integrations, 428 Precondition Required, If-Match, and 412 Precondition Failed provide a standards-based path to prevent lost updates and make recovery from ambiguous failures safer. Timeouts belong in the same discussion because a retry without a timeout is effectively an admission that the caller is willing to hold scarce resources indefinitely. The AWS Builders’ Library notes that long waits tie up memory, threads, connections, ephemeral ports, and other limited resources, and that timeouts set too low can also create cascading retry traffic. In practice, the retry policy and the timeout budget are the same control surface viewed from different angles. If the timeout is unbounded, retries arrive too late to be useful. If retries are unbounded, a timeout only delays the storm. Making HTTP Responses Actionable Once the retry boundary is defined, error payloads need to become machine-actionable. RFC 9457 standardizes the fields that matter: type, title, status, detail, and instance. The specification is especially useful because it separates a human-readable explanation from a machine-readable classification. The detail field is intended to help explain the specific occurrence and is not meant to be parsed for program logic; machine consumers should rely on type and well-defined extension members instead. Spring’s ProblemDetail maps directly to this model and supports non-standard properties through an extension map that can be rendered as top-level JSON. That gives upstream services a clean way to expose retry hints, domain error codes, and correlation information without forcing clients to scrape message strings. That structure belongs at the client boundary, where HTTP details are translated once into domain-specific exceptions. Spring’s synchronous RestClient is well-suited to this because it allows custom status handlers rather than forcing every 4xx into the same exception path. Java private ShipmentResponse reserveShipment(ShipmentCommand command) { return restClient.post() .uri("/shipments/reservations") .header("Idempotency-Key", command.requestId()) .body(command) .retrieve() .onStatus(status -> status.value() == 429 || status.value() == 503 || status.value() == 504, (request, response) -> { var retryAfter = response.getHeaders().getFirst("Retry-After"); throw new TransientUpstreamException("shipping-api", retryAfter); }) .onStatus(HttpStatusCode::is4xxClientError, (request, response) -> { throw new NonRetryableUpstreamException("shipping-api"); }) .body(ShipmentResponse.class); } This boundary keeps the retry policy honest. Throttling and temporary unavailability become explicit transient exceptions that can carry backoff hints, while semantic client errors become immediately terminal. The idempotency key on the outbound write does not make every POST automatically safe, but it creates the contract required for the upstream side to deduplicate repeated attempts when replay becomes necessary after a timeout or dropped connection. That is substantially safer than retrying blindly after any exception because the classification is now based on protocol semantics and upstream intent rather than on a generic catch block. Backoff That Respects the Protocol After classification comes timing. Fixed-delay retry loops are attractive because they are easy to read, but they are a poor fit for overloaded distributed systems. Both AWS and Azure recommend pausing between attempts and increasing the delay because immediate retries often land while the dependency is still unhealthy. AWS adds the deeper operational point: when many clients retry in lockstep, recovery traffic becomes a synchronized burst, which is exactly why jitter matters. Azure’s retry-storm guidance makes the operational rule even more direct: retry attempts and total duration have to be limited, and the retry-after header must be honored when it is sent. Retry-After can be either a relative number of seconds or an absolute HTTP date, so treating it as a magic integer is incomplete protocol handling. Resilience4j is useful here because its retry model is more expressive than a simple fixed wait. The library supports maxAttempts, waitDuration, retryOnResultPredicate, exception-based selection, and an intervalBiFunction that can compute the next delay from the attempt count and either a result or an exception. Java RetryConfig retryConfig = RetryConfig.custom() .maxAttempts(4) .retryOnException(ex -> ex instanceof ResourceAccessException || ex instanceof TransientUpstreamException) .ignoreExceptions(NonRetryableUpstreamException.class, ValidationException.class) .intervalBiFunction((attempt, either) -> { var ex = either.getLeft(); if (ex instanceof TransientUpstreamException t && t.retryAfter() != null) { return t.retryAfterDuration(); } var base = Math.min(200L * (1L << (attempt - 1)), 3000L); var jitter = ThreadLocalRandom.current().nextLong(0, 250); return Duration.ofMillis(base + jitter); }) .failAfterMaxAttempts(true) .build(); This pattern does two things that enterprise integrations often miss. First, it respects protocol hints when the server provides them. Second, when the server does not provide them, it falls back to bounded exponential delay with jitter instead of immediate replay. That preserves throughput during brief faults without turning one failed request into a tight loop. It also keeps business semantics intact by excluding validation failures and other known terminal conditions from the retry path entirely. Retry With Circuit Breaking and Fallbacks Retry should never be the only protection layer around a dependency. Azure’s circuit breaker guidance draws the distinction clearly: retry assumes the operation may succeed soon, while a circuit breaker stops calls that are likely to fail and allows the system to probe for recovery later. Resilience4j implements this with count-based or time-based sliding windows and explicit breaker states, which makes the breaker a statistical decision point rather than a hardcoded timeout reaction. In practice, retries belong inside a bounded window, and the circuit breaker decides when that window should close early because the failure is no longer transient. For annotation-driven Spring services, that composition stays concise as long as the fallback preserves business truth. A fallback should not fabricate success merely to keep the API green. A degraded but truthful state is a better contract than a false positive. Java @CircuitBreaker(name = "paymentGateway", fallbackMethod = "deferCapture") @Retry(name = "paymentGateway") public PaymentResult capture(PaymentCommand command) { return paymentGateway.capture(command); } private PaymentResult deferCapture(PaymentCommand command, Exception ex) { outbox.save(new PendingCapture(command.paymentId(), command.requestId(), ex.getMessage())); return PaymentResult.pending(command.paymentId()); } The important detail is not the annotation pair itself, but the semantics of the fallback. Writing an outbox record or reconciliation task acknowledges that the payment state is uncertain and that recovery will continue asynchronously. Returning pending instead of captured prevents downstream systems from treating a degraded path as a confirmed business success. That is the difference between fault tolerance and silent data corruption. Reactive Flows and the Hidden Cost of Convenience Reactive clients make retry composition even easier, which is precisely why strict filtering matters. Spring’s WebClient maps responses with status codes of 400 and above to exceptions by default, and onStatus allows those responses to be reclassified. Reactor then adds a retry DSL where Retry.backoff is preconfigured for exponential backoff with jitter. The result is elegant, but elegance is dangerous when it hides accidental replay of all failures instead of only transient ones. Java public Mono<InventorySnapshot> fetchInventory(String sku) { return webClient.get() .uri("/inventory/{sku}", sku) .retrieve() .onStatus(status -> status.value() == 429 || status.value() == 503, response -> response.bodyToMono(ProblemDetail.class) .defaultIfEmpty(ProblemDetail.forStatus(response.statusCode())) .map(problem -> new TransientUpstreamException(problem.getDetail()))) .bodyToMono(InventorySnapshot.class) .retryWhen(Retry.backoff(3, Duration.ofMillis(250)) .filter(TransientUpstreamException.class::isInstance)); } The critical move in this style is the filter. Without it, every WebClientResponseException becomes retryable, which means malformed requests, unauthorized access, and contract defects start looping through the same pipeline as a temporary overload. With the filter in place, the reactive chain remains expressive without becoming indiscriminate. The same principle applies to result-based retries as well: only states that are explicitly modeled as transient should flow back into the retry companion. Visibility as Part of the Contract An enterprise retry policy that cannot be observed is effectively untestable in production. Spring’s observability support is built around Micrometer observations, and Resilience4j provides a Micrometer module for its fault-tolerance primitives. That combination makes it possible to expose retry counts, breaker state, final outcome, and request timing in the same telemetry fabric. At the protocol level, RFC 9457’s instance field provides a stable error occurrence identifier that can also be propagated into logs and traces. Once those signals exist, a slow integration no longer appears as a single long call; it becomes visible as one business request that triggered multiple upstream attempts before succeeding or degrading. Conclusion Advanced error handling in enterprise REST integrations is not built from retries alone. It is built from protocol-aware classification, explicit replay safety, structured error payloads, bounded backoff with jitter, circuit breaking for persistent faults, truthful fallbacks, and telemetry that exposes every extra attempt. HTTP already provides essential semantics for temporary overload, rate limiting, and conditional updates, while Spring, Reactor, and Resilience4j provide the implementation hooks needed to preserve those semantics in code. When those layers are combined deliberately, retries stop being a reflex and become a controlled recovery strategy that protects both correctness and system stability. More
Beyond Manual Annotation: Engineering Self-Correcting Pseudo-Labeling Pipelines
Beyond Manual Annotation: Engineering Self-Correcting Pseudo-Labeling Pipelines
By Harshith Narasimhan Srivatsa
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 2
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 2
By Sangharsh Agarwal
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
By Krishnaveni Musku
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1

I set out to build a simple Slack bot that could answer questions about our GitHub repository — open bugs, pending PRs, and recent releases. Straightforward enough. It turned into 400 lines of API glue code. When I asked Claude, ChatGPT, Gemini, and several coding assistants for architecture advice, they all converged on the same conventional pattern: What every AI suggestedWhat it means in practice1. Slack receives the mentionWrite a GitHub REST client2. Bot calls GitHub REST APIRouting logic per question type3. Feed response into Claude/GPTPagination per endpoint4. Model formats the answerMaintain API versions5. Bot posts back to SlackRepeat for every new data source This works. I built it. Three days, 400 lines of API client code, and it answered perhaps 60% of the questions my team asked. Questions like "Are any critical bugs related to PRs merged this week?" required custom correlation logic across multiple endpoints. Every new question type meant new code. Adding error monitoring as a second data source meant a separate integration entirely. After digging deeper into how AWS Bedrock handles tool use, I discovered the Model Context Protocol. I rebuilt the same bot in an afternoon — 150 lines, answering a far wider range of questions, and adding a new data source is a handful of lines in a single function. This article explains what changed and why it matters. The core insight: don't build an API client that feeds a model. Build a model that calls tools. These are fundamentally different architectures. The Architecture: Three Layers, One Loop The system is built in three layers. Each has exactly one responsibility and hands off cleanly to the next: Slack (Socket Mode) User types @mention → question received ↓ question passed to agent AWS Bedrock — Claude (Agent Loop) Reason → decide tools → call → read results → repeat ↓ tool calls routed via registry MCP Servers (GitHub + any other) 40+ tools per server — issues, PRs, releases, code search… ↓ tool results → reasoning → formatted answer → Slack Slack receives the @mention and passes the question down. Bedrock runs the agent loop — Claude reasons about which GitHub MCP tools to call, executes them, reads the results, and loops until it has enough data to answer. The tool registry routes each call to the correct MCP server automatically. The answer travels back up to Slack. Before vs. After: A Real Question To understand why this matters, consider a specific question a developer might ask in Slack: "Are any critical bugs related to PRs merged this week?" On the surface, this seems simple. But answering it correctly requires data from two separate GitHub API endpoints — the issues API for bugs, and the pull requests API for recent merges — and then correlation logic to match issue references in PR descriptions. If you are writing a traditional bot, you need to anticipate this question, write the two API calls, handle pagination on each, and write the join logic. Now imagine a dozen different question types. Each one is a new coding task. Traditional approachMCP approach1. Search GitHub for critical bugsClaude calls list_merged_prs (this week)2. Search for PRs merged this weekClaude calls search_issues (critical bugs)3. Write correlation logic across bothClaude calls get_issue for each candidate4. Handle pagination on each endpointClaude cross-references links in PR bodies5. Feed combined data to model to formatClaude returns correlated, formatted answer6. New question? Write new logic.New question? Model figures out new tools. What makes the MCP approach powerful is not just the line count — it is what the model is doing. Claude receives the full JSON Schema for every available GitHub tool at startup. When the question arrives, it reasons over those tool descriptions, selects the relevant ones, calls them in the right order, and then reasons over the combined results to produce an answer. It does not need to be told: "for bug questions, use search_issues". It reads the tool description and figures that out. The result is that the model can handle questions you never anticipated. "Show me PRs merged this week still linked to open bugs" — a slightly different framing of the same question — works without any code changes, because Claude adapts its tool selection to the new phrasing. Example Slack response: Plain Text :rotating_light: *Critical Bugs Linked to Recent PRs* • <https://github.com/org/repo/issues/1234|#1234> — Payment processing failure (linked to <https://github.com/org/repo/pull/5678|PR #5678>, merged Apr 14) • <https://github.com/org/repo/issues/1290|#1290> — Auth token timeout on mobile (linked to <https://github.com/org/repo/pull/5691|PR #5691>, merged Apr 15) Summary: 2 critical bugs found. Both linked to PRs merged this week. 6 tool calls: list merged PRs, search critical issues, get_issue per candidate. What the Model Context Protocol Does MCP is an open standard that lets AI models discover and call external tools through a uniform interface. Every MCP server exposes a tools/list endpoint returning every available action as a full JSON Schema. The model loads these at startup and reasons over them autonomously. Your application code never routes a single query. GitHub's official MCP server at api.githubcopilot.com/mcp/ exposes 40+ tools — issues, PRs, releases, code search — and a single GitHub token is all the authentication required. The shift is architectural, not cosmetic. The conventional model is a formatter — it receives data you fetched. The MCP model is a reasoning agent — it decides what to fetch, fetches it, and synthesizes the results. The first scales with the API code you write. The second scales with the MCP ecosystem. Why SRE and Platform Teams Should Care This bot started as a developer productivity tool. But when our SRE and platform engineering teams reviewed the architecture, they saw something broader: a pattern that could eliminate an entire category of operational toil. Platform teams spend considerable time maintaining integrations — every API change means updating a client, every new data source means a new integration project. The MCP pattern changes that calculus entirely. Integration toil. MCP server owners maintain compatibility with their own APIs. When GitHub updates its REST API, GitHub's MCP server absorbs that change. You own zero API client code.API drift. Traditional bots silently degrade when response schemas change. With MCP, the server owner tracks those changes — your bot keeps working.Correlation complexity. Linking deploys to errors, PRs to bugs, incidents to changesets — this logic is brittle in code and breaks constantly. Models do this naturally by reasoning across tool results in context.Platform rebuilds for new capabilities. Each new MCP server extends the bot without touching the agent loop. The loop is infrastructure. The servers are plugins. New team joins? New tool added? It is configuration, not development.The compounding effect matters most: every new MCP server registered is immediately available for any question the model asks. Traditional integrations accumulate glue code. MCP integrations accumulate capabilities. Conclusion The conventional approach to building AI-powered developer tools is not wrong — it works, and many teams run it successfully. But it carries a hidden cost: every new capability requires new code, every new data source requires a new integration, and every API change requires maintenance. Over time, that cost compounds. The Model Context Protocol eliminates that cost. By exposing tools through a uniform interface that the model discovers at startup, MCP shifts the integration burden away from your codebase and onto the ecosystem. The model reasons about which tools to call. You reason about what questions to answer. Part 1 has covered the why — the architectural distinction, the before/after comparison on a real question, and why this matters especially for SRE and platform teams. Part 2 puts it into practice with the complete implementation, step-by-step setup, and production lessons that make it reliable for daily use. Continue to Part 2: Implementation, Setup, and Production Patterns. Full project code on GitHub: https://github.com/sangharshcs/slack-github-mcp-bot.

By Sangharsh Agarwal
Securing the AI Host: Spring AI MCP Server Communication With API Keys
Securing the AI Host: Spring AI MCP Server Communication With API Keys

Abstract This is a continuation of the first article in this series, Building a Spring AI Assistant with MCP Servers: A Step-by-Step Tutorial, and describes how one may address a serious concern when thinking of going from prototype to production — security. The Problem The MCP specification recommends that MCP servers using HTTP as their transport layer be secured with OAuth 2.0 access tokens. In practice, plenty of teams don't have the surrounding infrastructure — an authorization server, token introspection, and operational maturity — ready when they start exposing internal tools to an AI assistant. But the traffic still needs to be authenticated. This article walks through a simpler scheme that fits that gap: per-server API keys carried in a custom HTTP header. The MCP server only authorizes requests that present a valid key; the MCP client analyzes each outbound request at runtime and attaches the right header for the right destination. We'll use Spring AI 1.1.4, MCP Spring Security 0.1.5, and Spring Security on Java 25. The setup involves three applications: telecom-assistant – the AI host and MCP client (port 8080)invoice-mcp-server – exposes invoice tools, keeps API keys in PostgreSQL (port 8081)vendor-mcp-server – exposes vendor tools, keeps a single API key in memory (port 8082) Two servers, two different storage strategies, on purpose - to show both ends of the spectrum. Every MCP server has its own API key id and secret. The picture below sketches the flow and the requirements to accomplish that. To be able to follow along, switch to the 2-main branch of the designated GitHub repository. Upon resolving the TODOs in there, this goal will have been fulfilled. Securing the Vendor Service (In-Memory Keys) TODO 1. This is the simpler case. Start by adding the security dependencies in pom.xml: XML <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.springaicommunity</groupId> <artifactId>mcp-server-security</artifactId> <version>0.1.5</version> </dependency> TODO 2. Since the API keys are stored in memory in this case, they are declared in the application.properties file, still as environmental variables. Properties files api.key.id = ${API_KEY_ID} api.key.secret = ${API_KEY_SECRET} TODO 3. The main aspect regarding this enhancement is the security configuration. In this regard, the below @Configuration class is added. Java @EnableWebSecurity @Configuration public class SecurityConfig { @Value("${api.key.id}") private String apiKeyId; @Value("${api.key.secret}") private String apiKeySecret; @Bean ApiKeyEntityRepository<ApiKeyEntity> apiKeyRepository() { return new InMemoryApiKeyEntityRepository<>( List.of(ApiKeyEntityImpl.builder() .name("API key") .id(apiKeyId) .secret(apiKeySecret) .build())); } @Bean SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception { return http.authorizeHttpRequests(auth -> auth.anyRequest().authenticated()) .with(McpApiKeyConfigurer.mcpServerApiKey(), apiKeyConfig -> apiKeyConfig.apiKeyRepository(apiKeyRepository()) .headerName("vendor-x-api-key")) .build(); } } A single ApiKeyEntity instance is constructed and stored as part of an InMemoryApiKeyEntityRepository. Then, when the SecurityFilterChain is built, a SecurityConfigurerAdapter is applied and an McpApiKeyConfigurer is used via which two concerns are addressed. On one hand, the expected security header name is set — vendor-x-api-key, while on the other, the repository that stores the server API key. At this point, the MCP server is secured. In order to be able to successfully communicate, an MCP client shall send HTTP requests that contain the required header that has the following form: Plain Text "vendor-x-api-key": [api-key-id].[api-key-secret] where api-key-id and api-key-secret are replaced with the values configured above. To test this functionality, the MCP Inspector [Resource 3] is used again, and additionally, before connecting to the running server, the authentication data is configured — vendor-x-api-key header is set to the known id.secret value. Securing the Invoice Service (Keys in PostgreSQL) Switching to the invoice-mcp-server, the enhancements here are a bit more complex as the API keys are stored in an external repository. TODO 4. Again the security dependencies are added in the pom.xml file, as before. TODO 5. As API keys are stored in the database, more exactly in the ServerApiKeys table, an mapping entity is created. Java @Table("ServerApiKeys") public class ServerApiKey { public static final String COL_SERVER = "Server"; public static final String COL_KEY_ID = "KeyId"; public static final String ON_CONFLICT_CLAUSE = String.format("(%s,%s)", COL_SERVER, COL_KEY_ID); @PkColumn("Id") private int id; @Column(COL_SERVER) private String server; @Column(COL_KEY_ID) private String keyId; @Column("KeySecret") private String keySecret; ... } As the Asentinel ORM library is already present in this module’s class-path, it is used to manage these entities; thus, the class is decorated with specific annotations. TODO 6. Just as previously done for the vendor server, the security configuration needs an ApiKeyEntityRepository. The approach here is more general, the interface is implemented, and the specific manner is suited. Java public class DbApiKeyEntityRepository implements ApiKeyEntityRepository<DbApiKeyEntityRepository.InvoiceApiKeyEntity> { private final OrmOperations orm; public DbApiKeyEntityRepository(OrmOperations orm) { this.orm = orm; } @Override public InvoiceApiKeyEntity findByKeyId(@NonNull String keyId) { return orm.newSqlBuilder(ServerApiKey.class) .select() .where() .column(ServerApiKey.COL_SERVER).eq("invoice-mcp").and() .column(ServerApiKey.COL_KEY_ID).eq(keyId) .execForOptional() .map(serverApiKey -> new InvoiceApiKeyEntity(keyId, serverApiKey.getKeySecret())) .orElse(null); } } As every record (API key) in the table is uniquely identified by Server and KeyId, whenever a request is received, the repository checks it and returns an implementation of the ApiKeyEntity interface, in our case Java public static final class InvoiceApiKeyEntity implements ApiKeyEntity { private final String id; @Nullable private String secret; private InvoiceApiKeyEntity(String id, @Nullable String secret) { this.id = id; this.secret = secret; } @Override public String getId() { return id; } @Override public @Nullable String getSecret() { return secret; } @Override public void eraseCredentials() { this.secret = null; } @Override public InvoiceApiKeyEntity copy() { return new InvoiceApiKeyEntity(id, secret); } } built from the database entity upon retrieval. It is a good practice to keep the secret of a ServerApiKey entity encoded in the database. In this tutorial, the default one — bcrypt — is used. To check the repository, the following simple integration test is used. Java @SpringBootTest @Transactional class DbApiKeyEntityRepositoryTest { private DbApiKeyEntityRepository apiKeyRepository; @Autowired private OrmOperations orm; private final PasswordEncoder passwordEncoder = PasswordEncoderFactories.createDelegatingPasswordEncoder(); @BeforeEach public void setUp() { apiKeyRepository = new DbApiKeyEntityRepository(orm); } @Test void provisionServerApiKey() { ServerApiKey serverApiKey = new ServerApiKey(); serverApiKey.setServer("invoice-mcp"); serverApiKey.setKeyId("api-key-id"); serverApiKey.setKeySecret(passwordEncoder.encode("api-key-secret")); orm.upsert(serverApiKey, PostgresJdbcFlavor.UPSERT_CONFLICT_PLACEHOLDER, ServerApiKey.ON_CONFLICT_CLAUSE); DbApiKeyEntityRepository.InvoiceApiKeyEntity apiKey = apiKeyRepository.findByKeyId(serverApiKey.getKeyId()); Assertions.assertNotNull(apiKey); Assertions.assertEquals(serverApiKey.getKeyId(), apiKey.getId()); Assertions.assertEquals(serverApiKey.getKeySecret(), apiKey.getSecret()); } } TODO 7. Once this is completed, the security configuration is set up, just as before. The only difference is that the previously created ApiKeyEntityRepository repository implementation is used and not an in-memory one this time. Java @Configuration @EnableWebSecurity public class SecurityConfig { private OrmOperations orm; @Autowired public void setOrm(OrmOperations orm) { this.orm = orm; } @Bean ApiKeyEntityRepository<DbApiKeyEntityRepository.InvoiceApiKeyEntity> apiKeyRepository() { return new DbApiKeyEntityRepository(orm); } @Bean SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception { return http.authorizeHttpRequests(auth -> auth.anyRequest().authenticated()) .with(McpApiKeyConfigurer.mcpServerApiKey(), apiKeyConfig -> apiKeyConfig.apiKeyRepository(apiKeyRepository()) .headerName("invoice-x-api-key")) .build(); } } At this point, the invoice-mcp server is secured as well, it can be checked with the MCP Inspector. Making the Client Send the Right Header to the Right Server Both servers are locked down. Now the client needs to know that requests to http://localhost:8081/mcp-invoice should carry the invoice-x-api-key header and requests to http://localhost:8082/mcp-vendor should carry vendor-x-api-key. A clean way to encode this is a chain of responsibility of resolvers. TODO 8. The expected API keys’ ids and secrets for the two servers are configured in the application.properties and for convenience, read from environment values. For simplicity, here both read from the same, although in a real implementation would not. Properties files mcp.server.api-key.parameters.invoice.id = ${API_KEY_ID} mcp.server.api-key.parameters.invoice.secret = ${API_KEY_SECRET} mcp.server.api-key.parameters.vendor.id = ${API_KEY_ID} mcp.server.api-key.parameters.vendor.secret = ${API_KEY_SECRET} Then, mapped into a @ConfigurationProperties annotated class. Java @ConfigurationProperties(McpServerApiKeyProperties.CONFIG_PREFIX) public class McpServerApiKeyProperties { public static final String CONFIG_PREFIX = "mcp.server.api-key"; private final Map<String, ApiKeyParams> parameters = new HashMap<>(); public Map<String, ApiKeyParams> getParameters() { return parameters; } public record ApiKeyParams(String id, String secret) {} } TODO 9. As there are two MCP servers involved, whenever the LLM instructs the AI host that it needs to query one of them, a destination-resolving strategy applied at runtime is introduced. It’s implemented as a chain of MCP server resolvers. The common interface of this chain of responsibility is McpServerResolver. It’s generic and declares two methods as part of the contract it proposes. Java public interface McpServerResolver<T> { Optional<T> resolve(URI uri); default String id() { return getClass().getSimpleName(); } } The central method that each implementer shall define receives the destination uri and attempts to resolve one of the available servers. If successful, the result is further used. The second optional method has the particular scope of identifying the current resolver; it has a default implementation and might help down the line during the resolving process (logging, etc.). As the items in the chain here have a similar approach, the next part is an abstract common implementation of the above interface. Java abstract class AbstractMcpServerResolver<T> implements McpServerResolver<T> { private static final Logger log = LoggerFactory.getLogger(AbstractMcpServerResolver.class); private final McpServerResolver<T> next; protected AbstractMcpServerResolver(McpServerResolver<T> next) { this.next = next; } @Override public Optional<T> resolve(URI uri) { if (uri == null) { return Optional.empty(); } log.debug("[{}]: Checking request towards {}.", id(), uri); Optional<T> result = resolveSpecific(uri); if (result.isPresent()) { log.debug("[{}]: Resolved target endpoint {}.", id(), uri); return result; } if (next == null) { log.debug("[{}]: No next resolver configured.", id()); return Optional.empty(); } log.debug("[{}]: Target endpoint {} not resolved. Delegating to [{}].", id(), uri, next.id()); return next.resolve(uri); } protected abstract Optional<T> resolveSpecific(URI endpoint); } The particular action is to be defined by each link in the chain as part of the Optional resolveSpecific(URI endpoint) method. Here, the functionality is similar; thus, the next common implementation is enough. Java public class UrlMcpServerResolver extends AbstractMcpServerResolver<ApiKeyHeader> { private static final Logger log = LoggerFactory.getLogger(UrlMcpServerResolver.class); private final URI serverUri; private final ApiKeyHeader header; public UrlMcpServerResolver(McpServerResolver<ApiKeyHeader> nextResolver, String serverUrl, ApiKeyHeader header) { super(nextResolver); this.serverUri = URI.create(serverUrl); this.header = header; } @Override protected Optional<ApiKeyHeader> resolveSpecific(URI endpoint) { if (serverUri.equals(endpoint)) { log.debug("[{}]: Target endpoint {} and config URL {} match.", id(), endpoint, serverUri); return Optional.of(header); } log.debug("[{}]: Target endpoint {} and config URL {} don't match.", id(), endpoint, serverUri); return Optional.empty(); } } TODO 10. The above resolveSpecific() method decides whether the current request is towards a particular server. If successful, an ApiKeyHeader object is returned so that it can be further used. Java public record ApiKeyHeader(String name, String value) {} TODO 11. The last step is the security @Configuration class that glues together the above-created pieces. Java @Configuration @EnableConfigurationProperties({McpServerApiKeyProperties.class}) public class SecurityConfig { private static final Logger log = LoggerFactory.getLogger(SecurityConfig.class); public McpStreamableHttpClientProperties mcpClientProps; public McpServerApiKeyProperties mcpServerApiKeys; @Autowired public void setMcpClientProps(McpStreamableHttpClientProperties mcpClientProps) { this.mcpClientProps = mcpClientProps; } @Autowired public void setMcpServerApiKeys(McpServerApiKeyProperties mcpServerApiKeys) { this.mcpServerApiKeys = mcpServerApiKeys; } @Bean ApiKeyHeader invoiceApiKeyHeader() { var apiKey = mcpServerApiKeys.getParameters().get("invoice"); return new ApiKeyHeader("invoice-x-api-key", String.format("%s.%s", apiKey.id(), apiKey.secret())); } @Bean ApiKeyHeader vendorApiKeyHeader() { var apiKey = mcpServerApiKeys.getParameters().get("vendor"); return new ApiKeyHeader("vendor-x-api-key", String.format("%s.%s", apiKey.id(), apiKey.secret())); } @Bean McpServerResolver<ApiKeyHeader> serverResolver() { var mcpProps = mcpClientProps.getConnections(); var mcpInvoice = mcpProps.get("invoice"); var mcpVendor = mcpProps.get("vendor"); return new VendorMcpServerResolver(new InvoiceMcpServerResolver(null, String.format("%s%s", mcpInvoice.url(), mcpInvoice.endpoint()), invoiceApiKeyHeader()), String.format("%s%s", mcpVendor.url(), mcpVendor.endpoint()), vendorApiKeyHeader()); } @Bean McpSyncHttpClientRequestCustomizer requestCustomizer() { return (builder, method, endpoint, body, context) -> { log.info("MCP Client request: method={}, endpoint={}, body={}", method, endpoint, body); serverResolver() .resolve(endpoint) .ifPresent(apiKeyHeader -> builder.header(apiKeyHeader.name(), apiKeyHeader.value())); }; } } The McpServerResolver returned by serverResolver() is used by the McpSyncHttpClientRequestCustomizer to analyze the request and add the necessary security header. Watching it Work Upon reaching this point, the MCP servers are restarted, together with the telecom-assistant. If the prompt in the screenshot below is issued, obviously, the invoice server should be queried, and the response should be received accordingly. In the AI host logs, the server resolving process can be depicted. Plain Text DEBUG i.m.c.t.HttpClientStreamableHttpTransport - Sending message JSONRPCRequest[jsonrpc=2.0, method=tools/call, id=2902fe67-2, params=CallToolRequest[name=get-invoices-by-pattern-on-number, arguments={pattern=vdf}, meta={}]] INFO c.h.t.config.SecurityConfig - MCP Client request: method=POST, endpoint=http://localhost:8081/mcp-invoice, body={"jsonrpc":"2.0","method":"tools/call","id":"2902fe67-2","params":{"name":"get-invoices-by-pattern-on-number","arguments":{"pattern":"vdf"},"_meta":{}} DEBUG c.h.t.c.r.AbstractMcpServerResolver - [VendorMcpServerResolver]: Checking request towards http://localhost:8081/mcp-invoice. DEBUG c.h.t.c.r.UrlMcpServerResolver - [VendorMcpServerResolver]: Target endpoint http://localhost:8081/mcp-invoice and config URL http://localhost:8082/mcp-vendor don't match. DEBUG c.h.t.c.r.AbstractMcpServerResolver - [VendorMcpServerResolver]: Target endpoint http://localhost:8081/mcp-invoice not resolved. Delegating to [InvoiceMcpServerResolver]. DEBUG c.h.t.c.r.AbstractMcpServerResolver - [InvoiceMcpServerResolver]: Checking request towards http://localhost:8081/mcp-invoice. DEBUG c.h.t.c.r.UrlMcpServerResolver - [InvoiceMcpServerResolver]: Target endpoint http://localhost:8081/mcp-invoice and config URL http://localhost:8081/mcp-invoice match. DEBUG c.h.t.c.r.AbstractMcpServerResolver - [InvoiceMcpServerResolver]: Resolved target endpoint http://localhost:8081/mcp-invoice. DEBUG i.m.c.t.HttpClientStreamableHttpTransport - Received SSE stream response, using line subscriber DEBUG i.m.spec.McpSchema - Received JSON message: {"jsonrpc":"2.0","id":"2902fe67-2","result":{"content":[{"type":"text","text":"[{\"id\":8,\"number\":\"vdf-tf-rev-1\",\"date\":\"2025-05-20\",\"vendor\":{\"id\":2,\"name\":\"Vodafone\"},\"serviceType\":{\"id\":3,\"name\":\"TollFree\"},\"status\":\"UNDER_REVIEW\",\"total\":10.44},{\"id\":7,\"number\":\"vdf-mpls-app-1\",\"date\":\"2025-05-10\",\"vendor\":{\"id\":2,\"name\":\"Vodafone\"},\"serviceType\":{\"id\":4,\"name\":\"MPLS\"},\"status\":\"APPROVED\",\"total\":80.44},{\"id\":6,\"number\":\"vdf-lo-paid-1\",\"date\":\"2025-06-10\",\"vendor\":{\"id\":2,\"name\":\"Vodafone\"},\"serviceType\":{\"id\":5,\"name\":\"Local\"},\"status\":\"PAID\",\"total\":85.44}]"}],"isError":false} The invoice server validates the invoice-x-api-key header against its database-backed repository, and the tool call proceeds. Final Notes API key authentication is a pragmatic stepping stone, not an end state. In a production environment, OAuth 2.0 remains the recommended approach, and Spring Security supports both. Even with API keys in play, take the basics seriously: store secrets encoded (bcrypt or stronger), rotate them, use a distinct key per server, and combine with TLS so the headers aren't visible in transit. The resolver-chain pattern on the client side gives you a natural place to add more rules later — token-fetch logic for OAuth, region-based routing, anything URI-shaped — without touching the rest of the AI host. The next (also the last) article in this series concludes the tutorial by instrumenting the chat client with advisors for memory, token tracking, and logging, and ultimately formulating several takeaways. Resources [1] – The source code for the Spring AI Telecom Assistant [2] – asentinel-orm project [3] – MCP Inspector

By Horatiu Dan DZone Core CORE
MuleSoft MCP and A2A in Production: What 17 Recipes Reveal
MuleSoft MCP and A2A in Production: What 17 Recipes Reveal

I searched Stack Overflow for MuleSoft MCP implementation questions last week. Zero results. Searched Reddit r/mulesoft for A2A discussions. Zero threads. Checked the Salesforce Trailblazer community, the MuleSoft help forum, and Salesforce Stack Exchange. Nothing. Across seven community sources where MuleSoft practitioners ask for help, MCP and A2A implementation questions don't exist yet. Meanwhile, MuleSoft shipped MCP Connector GA in 2025. The A2A Connector hit general availability shortly after. Agentforce 3 is built on MCP interoperability. Every enterprise integration team I work with is evaluating agentic AI. The vendor announcements are loud. The practitioner content is empty. I maintain the mulesoft-cookbook — 588 production-grade MuleSoft recipes, 17 of which cover MCP and A2A. The complexity distribution across those 17 tells you where teams will struggle before they start. The Implementation Ladder Nobody Talks About Those 17 recipes aren't 17 variations of the same thing. They form a three-tier implementation ladder, and every existing tutorial stops at the bottom rung. Tier 1: Connectivity (4 Recipes) MCP IDE setup, MCP server basics, MCP client, URL-based servers. This is where the Medium tutorials live. Stand up an MCP server, expose a tool, call it from an AI agent. The MuleSoft documentation covers this well: XML <!-- MCP Server Config (Streamable HTTP) --> <mcp:server-config name="mcp-server-config" serverName="My MCP Server" serverVersion="1.0.0"> <mcp:streamable-http-server-connection listenerConfig="http-listener-config" /> </mcp:server-config> <!-- Expose a tool via mcp:tool-listener --> <flow name="getWeatherFlow"> <mcp:tool-listener config-ref="mcp-server-config" name="get-weather"> <mcp:description>Get current weather for a city</mcp:description> <mcp:parameters-schema><![CDATA[{ "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "city": {"type": "string", "description": "City name"} }, "required": ["city"] }]]></mcp:parameters-schema> </mcp:tool-listener> <http:request method="GET" url="https://api.open-meteo.com/v1/forecast" /> </flow> From mcp-server-basics. Three XML blocks and your Mule app is an MCP server. This is the part everybody writes about. Tier 2: Production Hardening (7 Recipes) OAuth security, streaming responses, resource subscriptions, distributed tracing, load-balanced servers, tool discovery exchange, URL-based server management. This is where tutorials vanish, and production failures begin. Take distributed tracing. An AI agent calls your MCP tool. Your tool calls a CRM API. The CRM API calls a database. Something is slow. Without tracing, you're guessing which hop is the bottleneck. With W3C Trace Context propagated through the MCP layer: XML <flow name="mcp-traced-tool"> <http:listener config-ref="HTTP_Listener" path="/mcp/tools/get-customer"/> <!-- Extract trace context from MCP request --> <set-variable variableName="traceId" value="#[attributes.headers.traceparent default correlationId]"/> <logger message="MCP tool call started | traceId=#[vars.traceId] | tool=get-customer"/> <!-- Propagate trace to downstream --> <http:request config-ref="CRM_API" path="/customers/#[payload.params.customerId]"> <http:headers>#[{"traceparent": vars.traceId}]</http:headers> </http:request> <logger message="MCP tool call completed | traceId=#[vars.traceId]"/> </flow> From distributed-tracing. The MCP protocol does not mandate tracing — you implement it as a convention. Skip this, and your first production debugging session for a slow agent call takes hours instead of minutes. OAuth security is the other Tier 2 essential. The MCP protocol does not natively define authentication. Every MCP endpoint exposed without OAuth is an open door. I've seen teams deploy MCP servers to CloudHub with no auth because the getting-started tutorial didn't include it. The oauth-security recipe adds token introspection at the MCP entry point — validate before any tool executes. Tier 3: Multi-Agent Orchestration (6 Recipes) A2A protocol fundamentals, agent card registry, push notifications, streaming artifacts, error recovery, and multi-agent orchestration. No existing practitioner content covers this tier at all. The orchestration pattern is where the architecture shifts. Instead of one agent calling one MCP server, you have an orchestrator decomposing tasks across specialized agents via A2A: XML <flow name="orchestrator-flow"> <http:listener config-ref="HTTP_Listener" path="/orchestrate" method="POST"/> <!-- Step 1: Research agent gathers data --> <http:request config-ref="Research_Agent" path="/a2a/tasks/send" method="POST"> <http:body>#[output application/json --- { jsonrpc: "2.0", method: "tasks/send", params: {task: {message: {role: "user", parts: [{type: "text", text: payload.query}]}} }]</http:body> </http:request> <set-variable variableName="researchResult" value="#[payload.result.task.message.parts[0].text]"/> <!-- Step 2: Analysis agent processes research --> <http:request config-ref="Analysis_Agent" path="/a2a/tasks/send" method="POST"> <http:body>#[output application/json --- { jsonrpc: "2.0", method: "tasks/send", params: {task: {message: {role: "user", parts: [{type: "text", text: vars.researchResult}]}} }]</http:body> </http:request> </flow> From multi-agent-orchestration. Two agent calls, sequential. It looks simple. It isn't. Circular agent calls cause infinite loops — you need call depth limits. Latency compounds with sequential calls — parallelize where the task graph allows. Agent failures need fallback handling because agents fail differently than APIs: partial results, non-deterministic timeouts, and cascading retries. The Production Gap The complexity distribution across these 17 recipes reveals the problem. Only 3 are foundational or moderate — the level where current tutorials operate. Fourteen are moderately_hard, hard, complex, or very_complex. That means 82% of the implementation work lives above the tutorial line. Compare this with the DataWeave pattern ecosystem in the same cookbook: 102 recipes, hundreds of Stack Overflow questions, dozens of practitioner-authored articles across DZone, Medium, and Hashnode. The community content ecosystem around DataWeave took years to form. MCP and A2A are in their first year of GA availability. The content hasn't formed yet. This gap matters because teams are making architectural decisions right now about agentic integration. They're standing up MCP servers from the getting-started guide and calling it done. The questions they should be asking — how do I secure this? How do I trace agent calls? What happens when an agent in my orchestration fails? — have no community answers yet. Not on Stack Overflow. Not on Reddit. Not in any forum I could find. I've seen this pattern before. When MuleSoft introduced API-led connectivity in 2017, the getting-started guides covered Experience, Process, and System layers. What they didn't cover was what happens when your Process layer calls three System APIs and one of them is slow. Teams deployed and learned about circuit breakers, bulkheads, and caching through production incidents. The error-handling section of the cookbook has 51 recipes — most exist because somebody's integration failed at 2 AM. MCP and A2A are following the same trajectory. The difference is speed. API-led connectivity had years to build a community knowledge base before most enterprises adopted it. MCP went from announcement to GA to Agentforce integration in under a year. Teams are deploying before the community has had time to document what goes wrong. The danger isn't that teams will fail to build MCP servers. Tier 1 is straightforward. The danger is that they'll deploy unsecured, unobservable MCP endpoints to production and discover the Tier 2 problems through incidents rather than preparation. Three Things to Implement Before Going Multi-Agent If you're evaluating MCP and A2A for your MuleSoft environment, these three patterns determine whether your implementation scales or stalls. 1. OAuth on every MCP endpoint. The MCP protocol has no built-in authentication. Your MCP server is an HTTP listener with tool-specific routes. Without OAuth token validation at the entry point, any client that discovers your server URL can invoke your tools. Add introspection-based validation before any tool executes. Cache validation results to manage the latency overhead. See the oauth-security recipe for the full pattern — token extraction, introspection call, scope-to-tool mapping. 2. Distributed tracing from agent to data source. When an AI agent calls your MCP tool and the response takes 8 seconds, you need to know whether the latency is in the agent reasoning, the MCP transport, your Mule flow, or the backend API. Propagate W3C Trace Context through every hop. Most teams skip this because tracing feels optional until the first 8-second agent response hits production with no visibility into which hop caused it. The distributed-tracing recipe shows the extraction and propagation pattern. Without this, your first agent-related production incident is a guessing game. 3. A2A error recovery with fallback agents. Agents are not APIs. They have non-deterministic response times, they can return partial results, and LLM rate limits create transient failures that look permanent. The retry-then-fallback pattern in the error-recovery recipe uses until-successful with a secondary agent fallback. The critical detail: every retried task must be idempotent. Teams that skip this discover the problem when an agent triggers a duplicate Salesforce record update on retry — at 2 am with a production incident open. If your agent creates a record on each invocation, three retries mean three duplicate records. Design for idempotency first, retry second. What Comes Next The MCP/A2A ecosystem will develop the same way DataWeave patterns did — through practitioner implementation experience shared across communities, not through vendor documentation alone. Right now, the community is silent because most teams are still in evaluation or proof-of-concept. The production questions will come. The 17 recipes in the cookbook are a starting point — the implementation ladder from basic connectivity through production hardening to multi-agent orchestration. Each recipe links to a working configuration that you can deploy, test, and extend. What the ecosystem needs now is more practitioners sharing what they learn at Tier 2 and Tier 3: what breaks, what scales, what they wish they'd known before deploying their first MCP server to production. What I'd like to see from the community: implementations of MCP server patterns in non-standard environments — MCP behind API gateways with rate limiting per agent, A2A orchestration with more than two agents in the chain, tracing patterns for async agent workflows where the response comes minutes after the request. The patterns in the cookbook assume synchronous request-response. Real-world agent workflows are often asynchronous, event-driven, and unpredictable in latency. Those patterns need to be written from production experience, not from documentation. The full MCP and A2A recipe collection — including OAuth security, distributed tracing, and multi-agent orchestration — is in the mulesoft-cookbook on GitHub.

By Balachandra Shakar Bisetty
MuleSoft IDP: Enhancing Efficiency and Accuracy in Data Extraction
MuleSoft IDP: Enhancing Efficiency and Accuracy in Data Extraction

This article will help developers, architects, and readers understand MuleSoft's Intelligent Document Processing capabilities and functionality. After reading this article, the reader will understand how to use MuleSoft Intelligent Document Processing and the different use cases where it can be used. MuleSoft Intelligent Document Processing (IDP) helps you read and understand documents like invoices, purchase orders, and other structured or unstructured files. Using AI, it analyzes these documents and extracts key information, converting it into a clean, structured format. It uses AWS Textract to pull data from PDFs and images, making it easy to handle different document types. The extracted information can then be seamlessly integrated with tools like Anypoint Platform, MuleSoft RPA, Salesforce Flow, and Anypoint Composer, helping automate processes and improve efficiency. Use Cases of MuleSoft Intelligent Document Processing In many organizations, documents such as purchase orders and invoices arrive in a variety of formats and layouts, which makes manual processing time-consuming and error-prone. MuleSoft Intelligent Document Processing (IDP) addresses this challenge by automating the extraction and processing of information from both structured and unstructured documents. Reducing repetitive manual tasks helps teams save time, improve accuracy, and scale operations efficiently as document volumes increase. Beyond invoices and purchase orders, MuleSoft IDP can be applied across multiple industries and use cases. It is commonly used for processing healthcare documents and patient information, managing contracts, handling loan and insurance claim documents, and working with education and government records. It is also useful for organizing and extracting insights from legal documents, making it a versatile solution for any document-heavy workflow. MuleSoft IDP supports commonly used file formats such as PDF, PNG, and JPEG, allowing businesses to work with both digital and scanned documents. It also supports multiple languages, including English, Spanish, and German, making it suitable for global use cases. To ensure secure and efficient operations, MuleSoft IDP follows defined data retention policies along with certain limits and quotas. These controls help manage how long data is stored and how much processing can be performed, depending on the specific configuration and usage. For precise details, refer to MuleSoft’s official documentation. The following are the data retention policy and limits/quotas supported by the MuleSoft Intelligent Document Processing: Figure 1: MuleSoft intelligent document processing limit and supported The following is the data retention policy for the MuleSoft Intelligent Document Processing: Document Action Editor Modified files are temporarily stored in the Document Action Editor while testing is underway.When the editor is closed or a new file is uploaded, the extracted data is automatically deleted. Document Action Execution Endpoint Data is safely stored while it is being executed, and files are kept in an S3 bucket.Keeps data on successful executions for 7 days.Data is kept for seven days following task completion for executions that need human review. Keeps unfinished work for 60 days.Users are unable to set retention periods. IDP extracts data by default using its natural language processing model (IDP NLP) in response to the preset prompts. You can choose Einstein to examine the document and extract the data when you create a document action. Use Einstein to extract information from unstructured documents, such a driver’s license, insurance claim record or a medical record with handwriting, or to find the answer to complicated queries concerning the document, like how much an invoice will cost after taxes and other considerations. To analyze and understand documents, MuleSoft Intelligent Document Processing (IDP) uses a combination of AI models rather than relying on a single one. Through Salesforce Einstein, it leverages multiple advanced large language and multimodal models via the Einstein Trust Layer, which ensures secure and governed access to AI capabilities. Some of the key supported models include: Einstein OpenAI GPT-4o – A strong general-purpose model suitable for most document processing tasks. It performs well even with non-Latin languages and can identify layout elements like font sizes and styles. However, it has lower accuracy when reading checkboxes in forms, so prompting it clearly (for example, asking it not to assume missing data) helps improve results.Einstein OpenAI GPT-4o Mini – A faster model designed for more focused tasks. While it delivers quick responses, it may sometimes show less detailed reasoning. It also has limitations in accurately interpreting checkboxes in forms.Einstein Gemini 2.0 Flash 001 – Particularly effective for image-heavy documents, offering better accuracy in visual analysis. It provides moderate accuracy for checkbox detection, especially when documents are processed one page at a time, and supports structured outputs.Einstein Gemini 2.5 Flash – An improved version of the Gemini Flash model, offering faster performance and higher accuracy, especially for image-based documents and complex layouts. In addition to these models, MuleSoft IDP uses AWS Textract to extract text, tables, and key-value pairs from documents such as PDFs and images. It may also incorporate other AI services from Salesforce and AWS to improve tasks like document classification, entity recognition, and data extraction. By combining these models and services, MuleSoft IDP can not only extract information from documents but also understand context and structure, making the data ready for seamless integration and automation across platforms. A document action is a multi-step procedure that scans a document, filters out fields, and returns a structured response in the form of a JSON object using several AI engines. Every document action specifies the fields to be extracted, the fields to be filtered out of the response, and the kinds of documents it accepts as input. The following are the components of document action: Document types: outlines the kinds of documents that are acceptable for input.To extract fields: specifies which document fields should be extracted.Fields to filter out: Specifies which response fields should be excluded.Confidence score: Indicates the likelihood that the value was accurately extracted by IDP.Prompts: Asks questions in natural language to improve the data extraction procedure.Reviewers: Specifies who examines documents that fall below the threshold for confidence scores. You can set the minimum confidence score that is acceptable for each field to be extracted, mark fields as necessary, hide fields, and set up prompts to improve and hone the data-extraction process by posing natural language queries. The probability that IDP correctly extracted the value from a document is indicated by the confidence score. A 100% confidence score, for instance, indicates that IDP extracted the value completely accurately. A 70% confidence level, on the other hand, indicates that there is a 20% possibility that the extracted value is incorrect. Publishing to Anypoint Exchange MuleSoft Intelligent Document Processing allows you to publish the document action into the Anypoint Exchange as an asset that provides the following endpoints. POST/executions  –  This endpoint allows you to post the document to MuleSoft Intelligent Document Processing for scanning and extracting data. The following curl command can be used to post the document: Shell curl -H "authorization: Bearer <Bearer_token>" \ -F "file=@\"test.pdf\"" \ https://{idp_domain}.us-east-1.anypoint.mulesoft.com/api/v1/organizations/{organizationId}/actions/{documentActionId}/versions/{assetVersion}/executions GET /executions/{executionId}  –  This endpoint will allow you to retrieve the execution status (Success or Manual Validation Required) with the fields and prompt response for the document that has been posted to the IDP. The following curl command can be used to retrieve IDP execution status: Shell curl -H "Authorization: Bearer <Bearer_Token>" \ https://<idp_domain>.us-east-1.anypoint.mulesoft.com/api/v1/organizations/{organizationId}/actions/{documentActionId}/versions/{assetVersion}/executions/{executionId} To access the above APIs, you need to generate an authorization token. To generate an authorization (Bearer) token, you need to create a connected app in the Anypoint Platform with the scope “Execute Document Actions.” Once you have registered the connected app, it will provide the clientId and clientSecret, which can be used in the curl command below to generate the Authorization (Bearer) token. Shell curl -X POST -H "content-type: application/json" -d "{\"grant_type\": \"client_credentials\", \"client_id\": \"<Client_Id>\", \"client_secret\": \"<Client_Secret>\"}" \ https://anypoint.mulesoft.com/accounts/api/v2/oauth2/token Bearer token received in the response that can be used in the above POST and GET requests. Benefits of MuleSoft Intelligent Document Processing The following are the benefits of MuleSoft Intelligent Document Processing: Reduce cost: Intelligent document processing can lower the cost by automating the manual document data extraction.Improve efficiency and productivity: Intelligent document processing can work around the clock and process documents more quickly than manual methods. Intelligent document processing can work around the clock and process documents more quickly than manual methods.Reduce human errors: By using automated extraction and validation, Intelligent document processing can reduce human error and guarantee data consistency.Improve accuracy: By using automated extraction and validation, Intelligent document processing can reduce human error and guarantee data consistency.Easy to integrate: MuleSoft IDP can be easily integrated with Robotic Automation Process (RPA) by using the REST APIs provided by the IDP. Conclusion In conclusion, MuleSoft Intelligent Document Processing (IDP) offers a powerful and practical way for organizations to modernize how they handle documents. By combining AI-driven extraction with seamless integration capabilities, it helps reduce manual effort, improve data accuracy, and accelerate business processes. As businesses continue to deal with large volumes of unstructured data, solutions like IDP become increasingly important. They not only streamline operations but also support better compliance, scalability, and decision-making. By adopting MuleSoft IDP, organizations can enhance productivity, lower operational costs, and ultimately deliver a better experience to their customers. MuleSoft Intelligent Document Processing (Invoice Processing Using MuleSoft IDP): Part I MuleSoft Intelligent Document Processing (Invoice Processing via IDP REST APIs):  Part II MuleSoft Intelligent Document Processing: Generic Document Processing With IDP: Part III MuleSoft Intelligent Document Processing: Generic Document Processing via IDP REST APIs: Part IV MuleSoft Intelligent Document Processing: IDP Callback and Automation: Part V MuleSoft Intelligent Document Processing: Supercharge Document Automation With Einstein AI: Part VI

By Jitendra Bafna
A Hands-On ABAP RESTful Programming Model Guide
A Hands-On ABAP RESTful Programming Model Guide

In the SAP ABAP world, BAPIs (Business Application Programming Interfaces) have long been the standard for programmatic access to business functionality. These are essentially RFC-enabled function modules representing operations on business objects. However, with the advent of SAP S/4HANA and the new ABAP RESTful Application Programming Model (RAP), the classic BAPI approach is being phased out in favor of modern, RAP-based APIs. SAP now provides a new generation of released APIs built on RAP, often referred to as RAP BO interfaces or behavior definitions, which align with the clean core and cloud-ready strategy of S/4HANA. Why Replace BAPIs With RAP-Based APIs? For expert ABAP developers, the motivation to migrate from BAPIs to RAP interfaces comes down to modernization, maintainability, and future-proofing. In fact, SAP considers RAP business object interfaces the modern alternative to classic BAPIs. Here are some key reasons to make the switch: Clean Core and Upgrade Safety RAP interfaces are part of SAP’s clean core strategy. They are released and maintained by SAP, ensuring that your custom code calls stable, upgrade-safe APIs instead of custom wrappers or unofficial function modules. This means less retrofitting when you upgrade your S/4HANA system. Cloud Readiness In SAP S/4HANA Cloud, many traditional BAPIs are not whitelisted or available. RAP-based APIs, on the other hand, are designed for cloud use and adhere to strict SAP release contracts. They can be used in both private and public cloud editions as officially supported integration points. Modern API Design RAP uses contemporary ABAP development paradigms, CDS views for data modeling, behavior definitions (BDEF) for business logic, and OData/HTTP exposure by design. Instead of procedural function calls, you interact with entities through a standardized interface. This leads to clearer, more maintainable code and aligns with how SAP Fiori apps communicate. In essence, RAP interfaces provide a modern, RESTful facade over business objects, whereas BAPIs are older RPC-style calls. Built-in Transaction Handling Classic BAPIs often require manual transaction control. RAP introduces the Entity Manipulation Language (EML), which handles operations in a transactional context. You can bundle multiple operations on related entities and then perform one commit for all — simplifying the code and reducing errors. We’ll see this in the example below. Extensibility and Lifecycle Management RAP business object interfaces allow you to expose only what is needed to consumers, supporting versioning and gradual enhancements. They act as a stable facade while the underlying logic can evolve without breaking external contracts. This is analogous to how BAPIs provided stable interfaces in the past, but RAP makes it more flexible. In short, RAP BO interfaces let you keep your core objects clean and internal, exposing a controlled API layer to the outside world. Given these benefits, SAP recommends using a released RAP API whenever one is available for your scenario. Next, let's walk through a hands-on example of replacing a common BAPI with an RAP-based API in an S/4HANA 2022 system. ABAP RAP in S/4HANA 2022 at a Glance SAP S/4HANA 2022 includes the ABAP Platform 2022, which significantly expands RAP’s capabilities and the number of standard business objects exposed via RAP. Many operations that used to be performed with BAPIs now have equivalent released RAP BO interfaces provided by SAP. These interfaces are essentially pre-built RAP business objects that SAP delivers for developers to use instead of calling older BAPIs. For example: Bank creation: BAPI_BANK_CREATE to I_BANKTP (RAP Business Object interface)Cost center creation: BAPI_COSTCENTER_CREATE to I_COSTCENTERTP_2 (RAP BO interface)Sales order creation: BAPI_SALESORDER_CREATEFROMDAT2 to I_SALESORDERTP (RAP BO interface successor) These are just a few examples; SAP provides a growing list of such RAP-based APIs for S/4HANA 2022 and beyond. Each interface typically comes with documentation and code snippets to demonstrate how to consume it. If you are unsure whether a particular BAPI has an RAP equivalent, SAP’s Clean Core Object search tool can help identify if a released RAP interface (BDEF) exists for that business object. Hands-On Example: Modernizing a BAPI to RAP To make this concrete, let’s walk through replacing a classic BAPI with a RAP-based implementation. We’ll use the task of creating a Sales Order as our example. In the past, an ABAP developer might use the BAPI BAPI_SALESORDER_CREATEFROMDAT2 to programmatically create a sales order in SAP ECC or S/4HANA. In S/4HANA 2022, SAP provides the RAP business object interface I_SALESORDERTP as the official successor for this BAPI. We’ll demonstrate how to identify the replacement and implement the create operation using RAP. Step 1: Identify the RAP Replacement Interface First, determine if a released RAP API exists for the BAPI in question. In our case, we discovered that I_SALESORDERTP is the released RAP interface corresponding to the Sales Order business object (as a replacement for BAPI_SALESORDER_CREATEFROMDAT2). This interface is delivered by SAP as part of S/4HANA 2022. You can find such mappings via SAP documentation or tools; for instance, SAP's help docs and community posts confirm that I_SALESORDERTP covers create/read/update operations for sales orders via RAP. Once the appropriate interface is identified, you can plan to refactor your code to use it. Step 2: Review the Classic BAPI Usage (For Comparison) Let's briefly recap how the classic BAPI would be used to create a sales order. Typically, one would call the function module and then commit the work, for example: Plain Text DATA: lt_return TYPE TABLE OF bapiret2. CALL FUNCTION 'BAPI_SALESORDER_CREATEFROMDAT2' EXPORTING order_header_in = ls_order_header " header data structure TABLES order_items_in = lt_items " item lines to create return = lt_return. " return messages IF sy-subrc = 0 AND ( NOT line_exists( lt_return[ type = 'E' ] ) ). " If no errors, commit the transaction to finalize the create CALL FUNCTION 'BAPI_TRANSACTION_COMMIT' EXPORTING wait = 'X'. ENDIF. In the above pseudocode, we populate structures for the sales order header and items, call the BAPI, then explicitly call BAPI_TRANSACTION_COMMIT to ensure the new order is saved. We also have to check the return table for errors or messages. This imperative style works, but it requires handling commits manually and ensuring the data structures match what the BAPI expects. It also doesn’t automatically integrate with OData or modern Fiori UI frameworks without creating a custom OData service on top. Step 3: Implement the RAP-Based Create via EML Now, let's perform the same operation using the RAP interface I_SALESORDERTP and ABAP’s Entity Manipulation Language. With RAP, you don't call a function module; instead, you use EML statements on the interface’s entities. Suppose we want to create a sales order with one line item. We can do it in one shot as follows: Plain Text DATA(ls_failed) = REF #( ). DATA(ls_reported) = REF #( ). MODIFY ENTITIES OF i_salesordertp ENTITY salesorder CREATE FIELDS ( salesordertype salesorganization distributionchannel organizationdivision soldtoparty purchaseorderbycustomer ) WITH VALUE #( ( %cid = 'H001' " temporary ID for the new order %data = VALUE #( salesordertype = 'TA' " Order type salesorganization = '1010' " Sales Org distributionchannel = '10' " Distribution Channel organizationdivision = '00' " Division soldtoparty = '0010100001' " Sold-to Customer ID purchaseorderbycustomer= 'PO-12345' " Customer PO reference ) ) ) CREATE BY \_item " now specify the item(s) FIELDS ( product requestedquantity ) WITH VALUE #( ( %cid_ref = 'H001' " link to the order with temp ID %target = VALUE #( ( %cid = 'I001' " temp ID for new item product = 'MAT-1000' requestedquantity = 5 ) ) ) ) FAILED DATA(ls_failed) REPORTED DATA(ls_reported). In this code snippet, we use the MODIFY ENTITIES OF i_salesordertp statement to perform a deep create of a SalesOrder along with one SalesOrderItem. A few things to note from an engineer’s perspective: We specify the fields we want to set for the sales order and its item, similar to filling out the BAPI’s parameters, but here it's done with a structured syntax using WITH VALUE #(...) expressions. This ensures compile-time checking of field names and data types.The use of %cid (client ID) and %cid_ref is a mechanism to link the parent and child in one request. We assign a temporary ID 'H001' to the new sales order, and then reference it (%cid_ref = 'H001') when creating the item, so that the framework knows this item belongs to the new order. This allows creating a header and its items together in one call (a deep insert), which is handled seamlessly by RAP.The FAILED DATA(ls_failed) and REPORTED DATA(ls_reported) clauses serve to capture any errors or messages from the operation. This is analogous to checking the BAPI return table. If the create fails the ls_failed structure would contain the failed keys and error details. The ls_reported can hold messages or generated keys. At this point, the sales order is created in memory. To finalize and commit it to the database, we need to execute a commit in the RAP context. Step 4: Finalize With Commit In an interactive RAP scenario (like a Fiori app), the commit is managed by the framework when the user saves their changes. But in our manual ABAP code (above), we explicitly trigger the commit. In RAP, this is done using COMMIT ENTITIES rather than the classic BAPI_TRANSACTION_COMMIT. For example: Plain Text COMMIT ENTITIES EXPORTING RESPONSE OF i_salesordertp FAILED DATA(ls_save_failed) REPORTED DATA(ls_save_reported). IF sy-subrc <> 0. " Handle the error (e.g., log the messages in ls_save_reported) ENDIF. COMMIT ENTITIES will finalize all pending changes made by EML statements. Here, we also capture any final errors or messages in ls_save_failed/ls_save_reported for error handling. After this commit, the sales order is persisted, and the actual Sales Order number is available. SAP RAP supports late numbering, meaning the real keys might only be assigned at commit time, a detail to be aware of. You can retrieve the final keys using the CONVERT KEY statement if needed, but for our purposes, assume the commit gives us the new order number in the reported data. Now we have successfully created a sales order using the RAP API instead of the classic BAPI. The code is more declarative and leverages the RAP framework for integrity checks and relationships. We didn't have to call any function modules directly; everything was done through the RAP interface of the Sales Order business object. Engineer’s Perspective: Key Takeaways From the above process, a few important differences stand out when replacing BAPIs with RAP-based APIs: Finding the right interface: It may require a bit of research to find the correct RAP interface name for a given legacy BAPI. SAP provides documentation to help with this. Once found, using the RAP interface ensures you are calling an SAP-supported API that will remain stable across updates.Refactoring effort: Replacing a BAPI call with a RAP call is not a one-to-one code replacement; it often involves refactoring. You will define data structures according to RAP’s CDS-based types and use EML syntax. There is a learning curve, but the benefit is cleaner ABAP code that is easier to maintain. Tools in ABAP Development Tools (ADT) can help by autocompleting structure components and EML keywords, since the interface and its types are known in the dictionary.Behavior and validation: When you use RAP interfaces, you automatically leverage SAP’s implemented behavior logic. For instance, any checks or defaulting logic SAP built into the Sales Order RAP BO will execute. You no longer manually call multiple BAPIs or perform intermediate checks; the RAP framework triggers all the necessary business logic as part of the modify/commit process. This leads to fewer errors and a more consistent outcome with standard SAP behavior.Transaction management: As shown, instead of BAPI_TRANSACTION_COMMIT, you use COMMIT ENTITIES. This aligns with the unit of work paradigm in RAP. It also allows bundling multiple operations.Upgrade and extensibility: By adopting RAP-based APIs, your custom code is positioned for the future. SAP will continue to enhance these interfaces, and you can opt in to those changes by switching to a new interface version or extending the CDS views. In contrast, classic BAPIs in S/4HANA are largely in maintenance mode; they exist mainly for backward compatibility and might not cover new business scenarios introduced in S/4HANA. Moving to RAP ensures you can take advantage of new SAP innovations with minimal effort. Conclusion Replacing BAPIs with RAP-based APIs is a strategic move for any ABAP developer working on S/4HANA 2022 and beyond. It aligns your custom developments with SAP’s modern, cloud-ready programming model. As we’ve seen, a classic scenario like sales order creation can be accomplished with RAP’s EML in a way that is more in line with modern development practices, yielding cleaner and more robust code. SAP itself positions RAP Business Object interfaces as the evolution of BAPIs, a way to build stable, public APIs that keep the core clean and extensible. While there is an upfront effort to learn RAP and refactor existing code, the payoff is significant in the long run, with better maintainability, fewer upgrade headaches, and the ability to run your extensions in the cloud. Importantly, whenever SAP provides a released RAP API to replace a classic BAPI, you should take that path. By doing so, you leverage SAP’s latest technology and ensure your solutions remain supported in future releases.

By Deepika Paturu
Implementing Secure API Gateways for Microservices Architecture
Implementing Secure API Gateways for Microservices Architecture

Modern microservice architectures consist of many independently deployable services, which brings new security challenges. One crucial best practice is to use an API Gateway as a centralized entry point to enforce security policies. In this article, we explore how to implement a secure API gateway in a microservices environment and demonstrate authentication configuration with code examples. Why Use an API Gateway for Microservices Security In a microservices architecture, each service exposes its own REST endpoints. Without a gateway, clients would have to authenticate individually with each service, a complex and error-prone approach. An API Gateway acts as a single entry point for all client requests, simplifying communication and centralizing cross-cutting concerns like security. As Chris Richardson notes, the API Gateway is also an ideal place to implement cross-cutting concerns, such as authentication. By routing all external traffic through a gateway, you can offload tasks like authentication, authorization, encryption, and rate limiting to this layer. Some key benefits of using an API gateway for security include: Unified access control: The gateway enforces robust access controls in one place. This avoids duplicating auth logic in every microservice and ensures consistent policies.Isolation of internal services: Microservices are not exposed directly to clients. The gateway shields internal APIs, preventing unauthorized access and reducing the attack surface. Backend services can trust that requests have passed through security checks.Monitoring and logging: As the single entry point, the gateway can log requests and monitor traffic for security analytics.Other edge functions: API Gateways often handle routing to the correct service, load balancing, input validation, and rate limiting to mitigate DDoS attacks. Centralizing these functions improves efficiency and maintainability. In summary, a gateway in front of your microservices allows you to apply consistent security measures across all services and simplify each service’s implementation. The microservices can remain focused on business logic while the gateway manages authentication and other front-line defenses. Open-Source API Gateway Solutions There are numerous technologies available to implement the API Gateway pattern. Since we are focusing on open-source solutions, here are a few popular choices: Kong gateway: An open source, high-performance API gateway built on NGINX. Kong supports flexible routing rules and a rich plugin ecosystem for authentication, rate limiting, transformations, and more.Envoy proxy: A modern L7 proxy often used in service meshes. Envoy can serve as an edge gateway with filters for features like JWT verification.Traefik: A cloud-native edge router written in Go. Traefik integrates well with orchestrators and provides middleware for things like basic authentication, OAuth/OIDC via forward-auth, and automatic TLS. It’s often used for its easy dynamic configuration and Let’s Encrypt integration.Others: There are open-source API management solutions like Tyk, Gravitee, WSO2 API Manager, KrakenD, etc. Each offers varying features, but at their core, they all provide gateway functionality to route and secure microservice APIs. Each of these solutions can secure your microservices, but their configuration and feature set differ. In the next section, we’ll focus on implementing authentication using Kong API Gateway as an example. Kong is a popular choice due to its lightweight nature and plugin flexibility, but the concepts will be similar for other gateways. Implementing Authentication in an API Gateway To illustrate how to implement a secure API gateway, we’ll walk through setting up Kong Gateway in front of a microservice and enabling authentication. The goal is to require a valid JSON Web Token (JWT) for clients calling the microservice via the gateway. 1. Defining Services and Routes in Kong First, we need to configure Kong with a Service and a Route for our microservice. In Kong’s terminology, a Service object represents an upstream microservice, and a Route defines how requests from clients map to that service. Kong will listen for client requests on the route and forward them to the specified service. For example, we create a declarative configuration file for Kong in DB-less mode. Below is a snippet of kong.yml defining a service and route, and then attaching the JWT authentication plugin to that service: YAML _format_version: "2.1" services: - name: my-api-service url: http://localhost:3000 # upstream microservice URL routes: - name: api-route service: my-api-service paths: - /api # clients will call /api on the gateway plugins: - name: jwt service: my-api-service enabled: true config: key_claim_name: kid # use 'kid' field in JWT header to identify the key claims_to_verify: - exp # ensure the 'exp' (expiry) claim is valid In this config, we map my-api-service to the upstream at localhost:3000, and we route any requests with path /api on the gateway to that service. The jwt plugin is enabled on the service, which means Kong will require a valid JWT on all requests to this service. We configured the plugin to check the token’s exp claim and to expect a kid claim in the JWT header to identify the signing key. After loading this config and starting Kong, any request to http://<gateway>:8000/api will be intercepted by Kong. Since the JWT plugin is active, Kong will attempt to find a JWT in the request and validate it. If no token is present or the token is invalid, Kong will respond with 401 Unauthorized. At this point, our microservice is protected only authenticated calls should be forwarded. 2. Configuring Credentials (JWT Issuers and Secrets) Now that Kong is blocking unauthorized requests, we need to configure who is considered an authorized consumer and what credentials (JWTs) are accepted. In a real system, you would likely integrate with an Identity Provider or authorization server that issues JWTs. Kong uses the concept of Consumers to represent clients or user identities that consume your APIs. Each consumer can have credentials associated with it. We will add a consumer entry and a JWT secret for that consumer in our kong.yml: YAML consumers: - username: auth-service jwt_secrets: - consumer: auth-service key: my-issuer-key-123 # 'kid' value expected in the JWT header secret: my-jwt-signing-secret In this snippet, we added a consumer named auth-service. We then added a JWT credential for that consumer with a secret and a key. The key is a unique identifier and the secret is the HMAC secret that this consumer will use to sign JWTs. Essentially, we are telling the gateway to accept JWTs signed with my-jwt-signing-secret, as long as they carry kid: my-issuer-key-123 in their header, and treat them as coming from the auth-service consumer. With this configuration, the JWT plugin knows how to verify incoming tokens: it will look at the kid claim in the JWT header to find the matching consumer’s secret, then verify the token’s signature using that secret. It also checks the exp claim to ensure the token has not expired. 3. Testing the Secured Gateway Now the secure gateway is configured. Let’s quickly illustrate the behavior with example requests: PowerShell # Attempt to call the service without any token $ curl -i http://localhost:8000/api HTTP/1.1 401 Unauthorized ... # Now, obtain or craft a JWT signed with 'my-jwt-signing-secret' and kid 'my-issuer-key-123'. # For demonstration, we can use an online tool or a JWT library to create a token: # Header: { "alg": "HS256", "typ": "JWT", "kid": "my-issuer-key-123" } # Payload: { "sub": "user123", "exp": <some future timestamp>, ... } # Sign it with the secret. # Call the API with the JWT in the Authorization header $ curl -i -H "Authorization: Bearer <your_jwt_token_here>" http://localhost:8000/api HTTP/1.1 200 OK Hello world! As shown, without a valid JWT, the gateway returns a 401 Unauthorized response. With a valid JWT, Kong will authenticate the request and route it to the upstream service, which returns the expected data (HTTP 200 OK). The microservice itself did not need to implement any auth checks; the gateway handled it. Conclusion Implementing a secure API gateway for microservices involves setting up a robust gateway solution and offloading security concerns to it. We demonstrated how to use Kong to enforce JWT authentication in front of a microservice. The gateway approach streamlines authentication across all services. Once a request is verified at the edge, microservices can trust that identity and operate in a zero-trust, defense-in-depth manner. Open source API gateways like Kong, Envoy, and Traefik provide the building blocks to authenticate and authorize traffic, handle encryption, and apply policies uniformly. By centralizing these concerns, engineering teams can avoid duplicating security code across microservices and instead manage it in one place. As a result, the overall system becomes easier to secure and maintain. For advanced scenarios, from integrating with enterprise SSO/IdPs to implementing multi-tenant auth or fine-grained access control, the gateway can be extended with plugins or external auth services. The key is to establish the gateway as the trust barrier between clients and your microservices. With a secure API gateway in place, a microservices architecture can achieve both agility and strong security compliance.

By Mugunth Chandran
Event-Driven Pipelines With Apache Pulsar and Go
Event-Driven Pipelines With Apache Pulsar and Go

A Practical Walkthrough Most distributed systems eventually hit a wall with their messaging layer, whether it's Kafka's tight coupling between compute and storage, RabbitMQ's limited replay capabilities, or the operational overhead of managing multiple tools for queuing and streaming. Apache Pulsar was engineered to address these gaps from the ground up. In this article, we'll dissect a working Go-based demo that wires together a Pulsar producer, consumer, and Prometheus monitoring layer into a cohesive, observable messaging pipeline. The full source is on GitHub. Why Pulsar Deserves a Closer Look Pulsar's architecture makes a deliberate trade-off that most messaging systems avoid: it physically separates the broker tier (which handles routing, subscriptions, and protocol) from the storage tier (Apache BookKeeper, which handles persistence). This isn't just an implementation detail. It means you can independently autoscale message routing capacity without touching your storage cluster, and vice versa. Beyond the architecture, a few capabilities stand out for engineering teams: Multi-tenancy at the protocol level – tenants, namespaces, and topics form a three-level hierarchy, making it practical to run a single Pulsar cluster for multiple teams or services without namespace collisions.Four distinct subscription semantics – Exclusive, Shared, Failover, and Key_Shared give you precise control over how messages are distributed across consumer instances, something Kafka's consumer group model doesn't natively offer.Cursor-based message retention – Pulsar retains messages based on subscription cursors, not time-based log compaction. A consumer that falls behind doesn't lose messages; it simply catches up from its last acknowledged position.Native schema enforcement – The built-in schema registry validates message payloads at the broker level before they reach consumers, catching contract violations at the boundary rather than deep inside application logic. What the Demo Builds The project is structured as three independent Go binaries, each with a single responsibility: reStructuredText ├── producer/ │ ├── main.go # HTTP server → Pulsar publisher │ ├── go.mod │ └── go.sum ├── consumer/ │ └── main.go # Pulsar subscriber → message processor ├── monitor/ │ └── main.go # Pulsar producer + Prometheus metrics server ├── prometheus.yml # Scrape configuration └── README.md All three use `github.com/apache/pulsar-client-go/pulsar` — the official, Apache-maintained Go client. The client is not a thin wrapper; it implements the full Pulsar binary protocol, connection pooling, producer batching, and automatic Prometheus metric registration. Component 1: The Producer The producer exposes a single HTTP endpoint. An incoming HTTP request triggers a Pulsar publish operation, decoupling the caller from any direct knowledge of the messaging infrastructure. Go package main import ( "context" "fmt" "log" "net/http" "github.com/apache/pulsar-client-go/pulsar" ) var client pulsar.Client func main() { var err error client, err = pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) } defer client.Close() http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) { msg := r.URL.Query().Get("msg") err := publishMessage(msg) if err != nil { w.Write([]byte("msg failed to published")) } else { w.Write([]byte("msg successfully published")) } }) if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatal(err) } } func publishMessage(msg string) error { producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "my-topic", }) if err != nil { log.Fatal(err) } _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte("Hello"), }) return err } A few architectural observations worth unpacking: The HTTP-to-Pulsar bridge pattern is deliberately pragmatic. Rather than requiring every upstream service to embed a Pulsar client, you expose a thin HTTP adapter. This is particularly valuable when integrating with systems that speak HTTP natively — CI/CD pipelines, third-party webhooks, or legacy services that can't easily adopt a new client library. `pulsar.NewClient` establishes a connection pool, not a single TCP connection. The client maintains persistent connections to the broker and handles reconnection, load balancing across broker nodes, and TLS negotiation transparently. Calling `client.Close()` via `defer` ensures all in-flight messages are flushed before the process exits. `producer.Send` with `context.Background()` submits the message to the producer's internal send queue. The Pulsar client batches outgoing messages by default (configurable via `BatchingMaxMessages` and `BatchingMaxPublishDelay`), which significantly improves throughput under load without any changes to application code. For production use, the producer instance should be created once at startup and reused across requests. Creating a new producer per request incurs connection overhead and bypasses the batching optimization entirely. Component 2: The Consumer The consumer subscribes to a topic and processes messages with explicit acknowledgment. The subscription type chosen here — `pulsar.Shared` — has meaningful implications for how the system scales. Go consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message with Id: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } Subscription Semantics in Depth Pulsar's subscription model is one of its most differentiating features. Here's how the four types behave at the broker level: Exclusive – The broker enforces that only one consumer holds the subscription at any time. A second consumer attempting to subscribe with the same name will receive an error. This guarantees strict message ordering but eliminates horizontal scaling.Shared – The broker distributes messages across all active consumers in round-robin order. Any number of consumers can join or leave the subscription dynamically. This is the right choice for stateless workloads where processing order doesn't matter and throughput is the priority.Failover – The broker designates one consumer as the active receiver. Others remain connected but idle, ready to take over if the active consumer disconnects. This preserves ordering while providing high availability — a pattern common in financial transaction processing.Key_Shared – The broker routes messages with the same key consistently to the same consumer instance. This enables stateful processing (e.g., per-user session aggregation) without external coordination, as long as the consumer count remains stable. Why Explicit Acknowledgment Matters `consumer.Ack(msg)` signals to the broker that the message has been durably processed and can be removed from the subscription's cursor. If the consumer process crashes between `Receive` and `Ack`, the broker will redeliver the message to another consumer in the subscription. This is the mechanism behind at-least-once delivery. For workloads that require exactly-once semantics, Pulsar supports transactional acknowledgment, where the `Ack` and any downstream writes are committed atomically. That's a more advanced topic, but the foundation is the same `Ack` call shown here. Component 3: The Monitor The monitor is architecturally the most interesting component. It runs two HTTP servers concurrently — one for the application endpoint, one for Prometheus metrics — and uses a Pulsar producer to generate observable traffic. Go package main import ( "context" "fmt" "log" "net/http" "strconv" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6605", }) if err != nil { log.Fatal(err) } defer client.Close() prometheusPort := 2112 go func() { if err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil); err != nil { log.Fatal(err) } }() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", }) if err != nil { log.Fatal(err) } defer producer.Close() ctx := context.Background() webPort := 8082 http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) { msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-world")), }) if err != nil { log.Fatal(err) } else { log.Printf("Published message: %v", msgId) fmt.Fprintf(w, "Message Published: %v", msgId) } }) if err := http.ListenAndServe(":"+strconv.Itoa(webPort), nil); err != nil { log.Fatal(err) } } How the Pulsar Client Registers Prometheus Metrics When `pulsar.NewClient` is called, the Go client automatically registers a set of Prometheus collectors with the default `prometheus.DefaultRegisterer`. No additional instrumentation code is required. The metrics are served at `/metrics` on whatever port you bind `http.DefaultServeMux` to port — in this case, port `2112`. The metrics exposed include: `pulsar_client_producers_opened` / `pulsar_client_producers_closed` – producer lifecycle counters`pulsar_client_consumers_opened` / `pulsar_client_consumers_closed` – consumer lifecycle counters`pulsar_client_messages_published_total` – cumulative publish count per topic`pulsar_client_publish_latency_seconds` – histogram of end-to-end publish latency`pulsar_client_bytes_published_total` – total bytes written to the broker Running the Prometheus metrics server in a goroutine while the main goroutine handles the application HTTP server is idiomatic Go concurrency. Both servers share the same `http.DefaultServeMux`, which is why the Prometheus `/metrics` handler (registered automatically by the client library) is accessible on the metrics port without any explicit route registration. Prometheus Scrape Configuration YAML scrape_configs: - job_name: pulsar-client-go-metrics scrape_interval: 10s static_configs: - targets: - localhost:2112 The `scrape_interval: 10s` is a reasonable starting point for development. In production, you would typically align this with your alerting resolution requirements — a 30-second interval is common for dashboards, while 10 seconds or less is appropriate for latency-sensitive alerting rules. With these metrics flowing into Prometheus, you can build Grafana panels that surface producer throughput, consumer lag, and publish latency percentiles with three signals that matter most when diagnosing messaging pipeline issues. Running the Full Pipeline Prerequisites Apache Pulsar standalone Go 1.18+Prometheus (optional) Startup Sequence 1. Launch Pulsar in standalone mode: Shell bin/pulsar standalone 2. Start the producer service: Shell cd producer && go run main.go 3. Start the consumer service: Shell cd consumer && go run main.go 4. Start the monitor service: Shell cd monitor && go run main.go 5. Start Prometheus: Shell prometheus --config.file=prometheus.yml 6. Publish a message via the producer endpoint: Shell curl "http://localhost:8080/publish?msg=test_pulsar_message_publish_event" # msg successfully published 7. Trigger the monitor producer: Shell curl http://localhost:8082/produce # Message Published: (messageId) 8. Inspect raw Prometheus metrics: Shell curl http://localhost:2112/metrics | grep pulsar Engineering Takeaways Decouple publish triggers from client library dependencies. The HTTP-to-Pulsar adapter pattern used in the producer is not just a demo convenience. It is a legitimate architectural boundary. Services that need to emit events don't need to know anything about Pulsar's protocol, topic naming, or client configuration. They make an HTTP call; the adapter handles the rest.Match subscription type to processing semantics, not just throughput. A common mistake is defaulting to `Shared` for everything because it scales horizontally. If your processing logic is stateful, for example, aggregating events per user session. `Key_Shared` gives you the same horizontal scalability while preserving per-key ordering without any application-level coordination.Treat the acknowledgment boundary as your consistency boundary. Everything between `Receive` and `Ack` is your processing window. Any side effects (database writes, downstream API calls, cache updates) that happen in this window must be idempotent, because Pulsar will redeliver the message if the consumer fails before acknowledging. Design your processing logic around this constraint from the start, not as an afterthought.Zero-cost observability is a genuine advantage. The fact that `pulsar-client-go` registers Prometheus metrics automatically means you get throughput, latency, and connection health data from the moment your application starts, without writing a single line of instrumentation code. This is a meaningful operational advantage over client libraries that require manual metric registration. Extending the Demo The current implementation is intentionally minimal. Here are technically meaningful extensions worth exploring: Schema enforcement – Replace raw `[]byte` payloads with Pulsar's schema-aware producer/consumer API. Using `pulsar.NewAvroSchema` or `pulsar.NewJSONSchema` moves payload validation to the broker, preventing malformed messages from ever reaching consumers.Dead-letter topic routing – Configure `DeadLetterPolicy` on the consumer to automatically route messages that exceed a maximum redelivery count to a separate topic. This prevents poison-pill messages from blocking the subscription indefinitely.Producer batching tuning – Set `BatchingMaxMessages`, `BatchingMaxSize`, and `BatchingMaxPublishDelay` on `ProducerOptions` to optimize the throughput/latency trade-off for your specific workload profile.Graceful shutdown – Add `os/signal` handling to flush in-flight messages and close the producer cleanly before the process exits. The current `defer client.Close()` handles the happy path but won't fire on `SIGKILL`.Kubernetes-native deployment – Package each component as a container and deploy using the official Pulsar Helm chart. The producer and monitor can be exposed as Kubernetes Services; the consumer can run as a Deployment with HPA scaling based on the `pulsar_client_messages_published_total` metric exported to a custom metrics adapter. Conclusion The `apache-pulsar` project demonstrates that building a production-grade messaging pipeline with Apache Pulsar and Go doesn't require much code. It requires understanding the right abstractions. The producer-consumer-monitor triad covers the three concerns that matter in any event-driven system: getting data in, getting data out, and knowing what's happening in between. Pulsar's architecture, decoupled storage, flexible subscription semantics, and built-in observability make it a strong candidate for teams that have outgrown simpler messaging systems and need more precise control over delivery guarantees, scaling behavior, and operational visibility. Source Code https://github.com/shivik/apache-pulsar-demo

By Shivi Kashyap
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka

After implementing contract-first integration across three different microservices architectures, I've learned that the biggest bottleneck in distributed systems isn't technical; it's coordination between teams. When Team A waits for Team B to finish their API before starting integration work, you're throwing away weeks of productivity. Contract-first development flips this model. By defining your integration contracts upfront (OpenAPI specs, Avro schemas, database migrations), you enable teams to work in parallel, catch breaking changes early through CI validation, and treat contracts as the single source of truth. This isn't theoretical; this is how Netflix, Uber, and Amazon scale their engineering organizations. In this article, I'll show you production-ready contract-first patterns using Java 21, Spring Boot 3, OpenAPI 3, Apache Kafka with Avro, and Flyway migrations. You'll see real code from a working system that handles the three critical integration boundaries: REST APIs, event-driven messaging, and database schemas. The Problem: Why Traditional Integration Fails at Scale When systems integrate without contracts, you hit three major problems: 1. Serial development bottlenecks: Team A builds an API endpoint. Team B waits. Team B builds a consumer. Team A discovers the payload doesn't match what Team B expected. Both teams spend days debugging mismatched assumptions. 2. Late discovery of breaking changes: You deploy a service update that changes a response field from customerId to customer_id. Your API consumers break in production. No tests caught it because there was no contract to validate against. 3. Documentation drift: The Swagger docs say the endpoint returns a 201. The actual code returns a 200. The integration tests expect a 404. Nobody knows which one is right because there's no single source of truth. Contract-first development solves all three by making the contract the authoritative specification that generates code, mocks, tests, and documentation. What Contract-First Actually Means Contract-first means you define the integration boundary first (the contract) and then write code that conforms to it. The contract is not an afterthought or generated documentation. It's the design specification. A complete contract includes: Operations: Endpoints (REST), topics (Kafka), or tables (database)Data shapes: Request/response DTOs, event schemas, column definitionsValidation rules: Required fields, constraints, data typesError model: HTTP status codes, error payloads, dead-letter queuesNon-functional rules: Idempotency, retries, compatibility policies, SLAs Here's the mental model: Agree on the contract → generate tools → build independently → let CI enforce alignment. Contract Type 1: REST API Contracts With OpenAPI OpenAPI 3 specs are the gold standard for REST API contracts. You define endpoints, request/response schemas, validation rules, and error responses in YAML. Then you generate server stubs, client SDKs, mocks, and documentation from that single source. OpenAPI Contract Example Here's a production OpenAPI contract for an order management API: YAML openapi: 3.2.0 info: title: Orders API version: 1.0.0 description: Contract-first REST API for order management paths: /v1/orders: post: operationId: createOrder summary: Create a new order requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/CreateOrderRequest' responses: '201': description: Order created successfully content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '400': description: Validation error content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' '409': description: Idempotency conflict content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' /v1/orders/{orderId}: get: operationId: getOrder parameters: - name: orderId in: path required: true schema: type: string responses: '200': description: Order found content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '404': description: Order not found components: schemas: CreateOrderRequest: type: object required: [customerId, items] properties: customerId: type: string example: CUST-123 idempotencyKey: type: string description: Optional key for safe retries items: type: array minItems: 1 items: $ref: '#/components/schemas/OrderItem' OrderItem: type: object required: [sku, quantity] properties: sku: type: string example: SKU-001 quantity: type: integer minimum: 1 OrderResponse: type: object required: [orderId, customerId, status, items, timestamp] properties: orderId: type: string customerId: type: string status: type: string enum: [CREATED, REJECTED] items: type: array items: $ref: '#/components/schemas/OrderItem' timestamp: type: string format: date-time ErrorResponse: type: object required: [code, message, traceId, timestamp] properties: code: type: string enum: [VALIDATION_ERROR, NOT_FOUND, CONFLICT, INTERNAL_ERROR] message: type: string traceId: type: string timestamp: type: string format: date-time Implementing the Provider Side (Spring Boot) The contract drives the implementation. Your Spring Boot controller implements what the contract specifies: Java @RestController @RequestMapping("/v1/orders") @RequiredArgsConstructor @Slf4j public class OrderController { private final OrderService orderService; /** * POST /v1/orders * Contract: contracts/openapi/orders-api.v1.yaml */ @PostMapping public ResponseEntity<OrderResponse> createOrder( @Valid @RequestBody CreateOrderRequest request) { log.info("Creating order for customer: {}", request.customerId()); OrderResponse response = orderService.createOrder(request); return ResponseEntity .status(HttpStatus.CREATED) .body(response); } /** * GET /v1/orders/{orderId} */ @GetMapping("/{orderId}") public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) { OrderResponse response = orderService.getOrder(orderId) .orElseThrow(() -> new ResourceNotFoundException("Order not found: " + orderId)); return ResponseEntity.ok(response); } } Critical implementation details: DTOs match contract schemas exactly: CreateOrderRequest and OrderResponse are Java records generated from or validated against the OpenAPI specHTTP status codes match contract: 201 for creation, 404 for not found, 409 for idempotency conflictsValidation is enforced: @Valid annotation ensures request validation matches OpenAPI constraintsError responses are standardized: All errors return ErrorResponse with consistent structure Consumer Parallel Development Here's where contract-first shines. While your team implements the provider, the consumer team can: Generate a Java client from the OpenAPI spec using openapi-generatorRun a mock server that returns valid responses based on the contractWrite integration tests against the mockSwitch to the real service when it's ready (no code changes needed) The consumer doesn't wait for you to finish. They develop in parallel. Contract Type 2: Event Contracts With Kafka and Avro Event-driven systems need two contract layers: topic semantics (human-readable) and schema definitions (machine-validated). Topic Semantics Contract Document the operational contract for each topic: Markdown ## Topic: orders.order-created.v1 - **Purpose**: Emitted when an order is created successfully - **Key**: orderId (partition affinity per order) - **Delivery**: At-least-once (consumers must be idempotent) - **Consumer requirement**: Deduplicate by eventId - **Retry policy**: Consumer retries transient errors - **DLQ**: orders.order-created.v1.dlq for poison messages - **Compatibility**: Backward compatible schema evolution required Avro Schema Contract The Avro schema is your machine-validated contract: Java { "type": "record", "name": "OrderCreated", "namespace": "com.acme.events", "doc": "Event emitted when an order is successfully created", "fields": [ { "name": "eventId", "type": "string", "doc": "Unique event ID for idempotent processing" }, { "name": "occurredAt", "type": "string", "doc": "ISO 8601 timestamp" }, { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "source", "type": ["null", "string"], "default": null, "doc": "Order source (WEB, MOBILE, API). Nullable for backward compatibility." }, { "name": "items", "type": { "type": "array", "items": { "type": "record", "name": "OrderItem", "fields": [ {"name": "sku", "type": "string"}, {"name": "quantity", "type": "int"} ] } } } ] } Key pattern: The source field is nullable with a default value. This supports backward-compatible evolution; old consumers can read new events, and new consumers can read old events. Kafka Producer Implementation Java @Component @RequiredArgsConstructor @Slf4j public class OrderEventPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; public void publishOrderCreated(OrderCreated event) { String key = event.getOrderId(); log.debug("Publishing OrderCreated: orderId={}, eventId={}", key, event.getEventId()); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("orders.order-created.v1", key, event); future.whenComplete((result, ex) -> { if (ex == null) { log.info("Published OrderCreated: orderId={}, partition={}, offset={}", key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { log.error("Failed to publish OrderCreated: orderId={}", key, ex); } }); } } Production concerns addressed: Key-based partitioning: Using orderId as the key ensures all events for the same order go to the same partition, maintaining orderingAsync publishing with callbacks: Non-blocking publish with explicit success/failure handlingStructured logging: Captures partition and offset for troubleshooting Kafka Consumer With Idempotency At-least-once delivery means duplicates are possible. Consumers must deduplicate: Java @KafkaListener(topics = "orders.order-created.v1", groupId = "billing-service") public void onOrderCreated(OrderCreated event) { // Check if already processed if (processedEventsRepository.existsByEventId(event.getEventId())) { log.debug("Skipping duplicate event: {}", event.getEventId()); return; } // Process event billingService.createInvoice( event.getOrderId(), event.getCustomerId(), event.getItems() ); // Mark as processed processedEventsRepository.save( new ProcessedEvent(event.getEventId(), Instant.now()) ); } Idempotency pattern: Check eventId before processing, store it after processing. If the same event arrives twice, the second one is ignored. Architecture Diagram: Contract-First Flow After implementing contract-first integration across three different microservices architectures, I've learned that the biggest bottleneck in distributed systems isn't technical; it's coordination between teams. When Team A waits for Team B to finish their API before starting integration work, you're throwing away weeks of productivity. Contract-first development flips this model. By defining your integration contracts upfront (OpenAPI specs, Avro schemas, database migrations), you enable teams to work in parallel, catch breaking changes early through CI validation, and treat contracts as the single source of truth. This isn't theoretical; this is how Netflix, Uber, and Amazon scale their engineering organizations. In this article, I'll show you production-ready contract-first patterns using Java 21, Spring Boot 3, OpenAPI 3, Apache Kafka with Avro, and Flyway migrations. You'll see real code from a working system that handles the three critical integration boundaries: REST APIs, event-driven messaging, and database schemas. The Problem: Why Traditional Integration Fails at Scale When systems integrate without contracts, you hit three major problems: 1. Serial development bottlenecks: Team A builds an API endpoint. Team B waits. Team B builds a consumer. Team A discovers the payload doesn't match what Team B expected. Both teams spend days debugging mismatched assumptions. 2. Late discovery of breaking changes: You deploy a service update that changes a response field from customerId to customer_id. Your API consumers break in production. No tests caught it because there was no contract to validate against. 3. Documentation drift: The Swagger docs say the endpoint returns a 201. The actual code returns a 200. The integration tests expect a 404. Nobody knows which one is right because there's no single source of truth. Contract-first development solves all three by making the contract the authoritative specification that generates code, mocks, tests, and documentation. What Contract-First Actually Means Contract-first means you define the integration boundary first (the contract) and then write code that conforms to it. The contract is not an afterthought or generated documentation. It's the design specification. A complete contract includes: Operations: Endpoints (REST), topics (Kafka), or tables (database)Data shapes: Request/response DTOs, event schemas, column definitionsValidation rules: Required fields, constraints, data typesError model: HTTP status codes, error payloads, dead-letter queuesNon-functional rules: Idempotency, retries, compatibility policies, SLAs Here's the mental model: Agree on the contract → generate tools → build independently → let CI enforce alignment. Contract Type 1: REST API Contracts With OpenAPI OpenAPI 3 specs are the gold standard for REST API contracts. You define endpoints, request/response schemas, validation rules, and error responses in YAML. Then you generate server stubs, client SDKs, mocks, and documentation from that single source. OpenAPI Contract Example Here's a production OpenAPI contract for an order management API: YAML openapi: 3.2.0 info: title: Orders API version: 1.0.0 description: Contract-first REST API for order management paths: /v1/orders: post: operationId: createOrder summary: Create a new order requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/CreateOrderRequest' responses: '201': description: Order created successfully content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '400': description: Validation error content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' '409': description: Idempotency conflict content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' /v1/orders/{orderId}: get: operationId: getOrder parameters: - name: orderId in: path required: true schema: type: string responses: '200': description: Order found content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '404': description: Order not found components: schemas: CreateOrderRequest: type: object required: [customerId, items] properties: customerId: type: string example: CUST-123 idempotencyKey: type: string description: Optional key for safe retries items: type: array minItems: 1 items: $ref: '#/components/schemas/OrderItem' OrderItem: type: object required: [sku, quantity] properties: sku: type: string example: SKU-001 quantity: type: integer minimum: 1 OrderResponse: type: object required: [orderId, customerId, status, items, timestamp] properties: orderId: type: string customerId: type: string status: type: string enum: [CREATED, REJECTED] items: type: array items: $ref: '#/components/schemas/OrderItem' timestamp: type: string format: date-time ErrorResponse: type: object required: [code, message, traceId, timestamp] properties: code: type: string enum: [VALIDATION_ERROR, NOT_FOUND, CONFLICT, INTERNAL_ERROR] message: type: string traceId: type: string timestamp: type: string format: date-time Implementing the Provider Side (Spring Boot) The contract drives the implementation. Your Spring Boot controller implements what the contract specifies: Java @RestController @RequestMapping("/v1/orders") @RequiredArgsConstructor @Slf4j public class OrderController { private final OrderService orderService; /** * POST /v1/orders * Contract: contracts/openapi/orders-api.v1.yaml */ @PostMapping public ResponseEntity<OrderResponse> createOrder( @Valid @RequestBody CreateOrderRequest request) { log.info("Creating order for customer: {}", request.customerId()); OrderResponse response = orderService.createOrder(request); return ResponseEntity .status(HttpStatus.CREATED) .body(response); } /** * GET /v1/orders/{orderId} */ @GetMapping("/{orderId}") public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) { OrderResponse response = orderService.getOrder(orderId) .orElseThrow(() -> new ResourceNotFoundException("Order not found: " + orderId)); return ResponseEntity.ok(response); } } Critical implementation details: DTOs match contract schemas exactly: CreateOrderRequest and OrderResponse are Java records generated from or validated against the OpenAPI specHTTP status codes match contract: 201 for creation, 404 for not found, 409 for idempotency conflictsValidation is enforced: @Valid annotation ensures request validation matches OpenAPI constraintsError responses are standardized: All errors return ErrorResponse with consistent structure Consumer Parallel Development Here's where contract-first shines. While your team implements the provider, the consumer team can: Generate a Java client from the OpenAPI spec using openapi-generatorRun a mock server that returns valid responses based on the contractWrite integration tests against the mockSwitch to the real service when it's ready (no code changes needed) The consumer doesn't wait for you to finish. They develop in parallel. Contract Type 2: Event Contracts With Kafka and Avro Event-driven systems need two contract layers: topic semantics (human-readable) and schema definitions (machine-validated). Topic Semantics Contract Document the operational contract for each topic: Markdown ## Topic: orders.order-created.v1 - **Purpose**: Emitted when an order is created successfully - **Key**: orderId (partition affinity per order) - **Delivery**: At-least-once (consumers must be idempotent) - **Consumer requirement**: Deduplicate by eventId - **Retry policy**: Consumer retries transient errors - **DLQ**: orders.order-created.v1.dlq for poison messages - **Compatibility**: Backward compatible schema evolution required Avro Schema Contract The Avro schema is your machine-validated contract: JSON { "type": "record", "name": "OrderCreated", "namespace": "com.acme.events", "doc": "Event emitted when an order is successfully created", "fields": [ { "name": "eventId", "type": "string", "doc": "Unique event ID for idempotent processing" }, { "name": "occurredAt", "type": "string", "doc": "ISO 8601 timestamp" }, { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "source", "type": ["null", "string"], "default": null, "doc": "Order source (WEB, MOBILE, API). Nullable for backward compatibility." }, { "name": "items", "type": { "type": "array", "items": { "type": "record", "name": "OrderItem", "fields": [ {"name": "sku", "type": "string"}, {"name": "quantity", "type": "int"} ] } } } ] } Key pattern: The source field is nullable with a default value. This supports backward-compatible evolution; old consumers can read new events, and new consumers can read old events. Kafka Producer Implementation Java @Component @RequiredArgsConstructor @Slf4j public class OrderEventPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; public void publishOrderCreated(OrderCreated event) { String key = event.getOrderId(); log.debug("Publishing OrderCreated: orderId={}, eventId={}", key, event.getEventId()); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("orders.order-created.v1", key, event); future.whenComplete((result, ex) -> { if (ex == null) { log.info("Published OrderCreated: orderId={}, partition={}, offset={}", key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { log.error("Failed to publish OrderCreated: orderId={}", key, ex); } }); } } Architecture Diagram: Contract-First Flow Figure 1: Contract-first development flow showing parallel team development Note: Solid arrows represent generation or implementation flow. Dashed arrows represent validation relationships. Contract Type 3: Database Contracts With Flyway Contract Type 3: Database Contracts With Flyway Database schemas are contracts, too. Flyway migrations let you version and evolve schemas with the same contract-first approach. Base Schema Migration File: V1__create_orders.sql SQL CREATE TABLE orders ( id VARCHAR(32) PRIMARY KEY, customer_id VARCHAR(32) NOT NULL, status VARCHAR(16) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE TABLE order_items ( order_id VARCHAR(32) NOT NULL REFERENCES orders(id), sku VARCHAR(64) NOT NULL, quantity INT NOT NULL CHECK (quantity > 0), PRIMARY KEY (order_id, sku) ); CREATE INDEX idx_orders_customer_id ON orders(customer_id); Schema Evolution: Expand/Migrate/Contract When you need to add a field without breaking old code, use the expand/migrate/contract pattern: Expand (V2__add_order_source.sql): SQL ALTER TABLE orders ADD COLUMN source VARCHAR(32); Migrate (application code): SQL -- Backfill for existing rows UPDATE orders SET source = 'UNKNOWN' WHERE source IS NULL; Contract (V3__enforce_source_not_null.sql): SQL -- After all systems produce source ALTER TABLE orders ALTER COLUMN source SET NOT NULL; This three-phase approach lets you deploy schema changes without downtime. CI/CD Enforcement: Making Contracts Real Contracts only work if they're enforced. Here are the CI gates that prevent breaking changes: REST API: OpenAPI Breaking Change Detection Run openapi-diff in CI to catch breaking changes: YAML # .github/workflows/api-contract-check.yml - name: Check for API breaking changes run: | npx openapi-diff \ main:contracts/openapi/orders-api.v1.yaml \ HEAD:contracts/openapi/orders-api.v1.yaml \ --fail-on-breaking This fails the build if you: Remove required fieldsChange field typesRemove endpointsChange HTTP status codes Kafka: Schema Registry Compatibility Check Configure Schema Registry to enforce backward compatibility: YAML # Schema Registry configuration confluent.schema.registry.url=http://schema-registry:8081 spring.kafka.properties.schema.registry.url=http://schema-registry:8081 # Enforce backward compatibility spring.kafka.producer.properties.auto.register.schemas=true spring.kafka.producer.properties.use.latest.version=true When you try to publish an incompatible schema, the producer fails at startup, before reaching production. Database: Flyway Validation Flyway validates migrations on startup: Properties files spring.flyway.validate-on-migrate=true spring.flyway.baseline-on-migrate=false If someone manually modified the database schema, Flyway detects the mismatch and fails the deployment. Integration Flow Diagram Figure 2: End-to-end flow showing REST → DB → Kafka integration with contracts enforced at each boundary Real-World Benefits: Production Implementation Experience After implementing contract-first across multiple services (orders, billing, inventory), teams consistently observe significant improvements: Integration quality: Substantial reduction in integration bugs reaching productionMost remaining bugs are business logic issues, not contract mismatchesClear contracts eliminate ambiguity in integration expectations Development velocity: Integration cycles measured in weeks rather than monthsConsumer teams start integration work immediately instead of waiting for provider completionMock servers enable realistic integration testing without coordination delays Operational reliability: CI-enforced contract validation prevents breaking changes from reaching productionBreaking changes caught during PR review, not after deploymentAutomated validation ensures compatibility before merge Documentation accuracy: Documentation generated from OpenAPI specs stays synchronized with implementation by designSwagger UI always reflects actual API behavior as both derive from the same contractNo manual documentation maintenance required Common Pitfalls and How to Avoid Them Pitfall 1: Treating Contracts as Documentation Wrong approach: Write code first, generate OpenAPI from annotations later.Right approach: Write OpenAPI first, generate server stubs, or validate implementation against it. Pitfall 2: Skipping CI Validation Wrong approach: Trust developers to manually check compatibility.Right approach: Automate breaking change detection in CI. Make it a required check. Pitfall 3: No Schema Evolution Strategy Wrong approach: Add fields without defaults, breaking old consumers.Right approach: All new fields must be optional or have defaults. Test with multiple schema versions. Pitfall 4: Ignoring Idempotency Wrong approach: Assume exactly-once delivery in Kafka.Right approach: Design consumers to deduplicate by eventId. Assume at-least-once. Pitfall 5: Coupling Contracts to Implementation Wrong approach: Expose internal database schema directly in API contracts.Right approach: Design contracts for consumers, not internal convenience. Use DTOs that map between the external contract and the internal model. When Contract-First Makes Sense Contract-first isn't always the answer. Use it when: ✅ Multiple teams integrate: The coordination cost justifies the upfront contract design effort✅ Public APIs or partner integrations: External consumers need stability and clear documentation✅ Microservices architecture: Services must evolve independently without breaking dependents✅ High change frequency: CI validation catches breaking changes early when changes are frequent Skip contract-first when: ❌ Single small team: Coordination overhead is low, so formal contracts add friction❌ Prototyping: You're exploring the problem space and expect major pivots❌ Internal tools with one consumer: The provider and consumer are maintained by the same person Key Takeaways Contracts enable parallel development: Provider and consumer teams work simultaneously instead of seriallyCI validation prevents breaking changes: Automate OpenAPI diffs, schema compatibility checks, and migration validationIdempotency is not optional: At-least-once delivery in Kafka requires consumers to deduplicate by event IDSchema evolution requires backward compatibility: Use nullable fields with defaults to support gradual rolloutsContracts are design specifications, not afterthoughts: Define contracts first, generate code from them What's Next? If you're implementing contract-first integration: Start with one integration, pick your most painful cross-team dependencyWrite the OpenAPI spec or Avro schema first, before any implementationSet up CI validation for that contract (openapi-diff or Schema Registry)Measure reduction in integration bugs and coordination overheadExpand to other integrations once you've proven the pattern Contract-first isn't a silver bullet, but it transforms integration from a coordination nightmare into a governance problem that CI can solve. When you're coordinating three teams across two time zones, that shift makes the difference between shipping in weeks versus months. Full source code: github.com/wallaceespindola/contract-first-integrations Related reading: OpenAPI 3.0 Specification: spec.openapis.orgConfluent Schema Registry: docs.confluent.io/platform/current/schema-registryFlyway Documentation: flywaydb.org/documentationMastering Contract-First API Development: moesif.com/blogResearch on Microservices Issues: arxiv.org Need more tech insights? Check out my GitHub, LinkedIn, and Speaker Deck. Happy coding!

By Wallace Espindola
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions

When AWS announced Lambda Durable Functions at re: Invent 2025, my first reaction was, "Okay, but how is this different from Step Functions?" I have been building serverless workflows on AWS for a while now, and Step Functions has always been my go-to service for orchestrating multi-step pipelines. So naturally, I wanted to put this new capability to the test. I decided to build a simple document processing workflow, an ETL pipeline with human-in-the-loop approval using both Durable Functions and Step Functions, then run 1,000 actual document processing workflows through each system. What I found surprised me. Not just the cost difference (79% cheaper with Durable Functions), but the trade-offs that nobody is really talking about yet. In this tutorial, I will walk you through building a zero-cost approval workflow using Lambda Durable Functions with Python. Along the way, I will share the actual cost numbers and the lessons that would have saved me a few hours of debugging. The Problem: Approval Workflows Are Expensive If you have ever built a document processing system that requires human approval, you know the pain. Someone uploads a file, your system processes it, and then... it sits there. Waiting for a human to review and approve it. That wait can be 5 minutes, 20 minutes, or even hours. Traditional approaches to handling this waiting are: Polling: Your code keeps checking every 30 seconds — "Is it approved yet? How about now?" making those calls the entire time.Always-on server: An EC2 instance or ECS container sits idle, costing you money 24/7, just to catch that one approval event.External state management: You build a custom solution with DynamoDB, SQS, and Lambda triggers — works fine, but it requires you to maintain a state machine you built yourself. What if your workflow could just... pause? No compute charges. No polling. Just pause, wait for the human to do their thing, and resume exactly where it left off. That is exactly what Lambda Durable Functions enables with the wait_for_callback pattern. What We Are Building Here is the workflow we will implement: Extract data → Transform data → Load data → Wait for approval (≈20 min) → Finalize & archive A CSV file gets uploaded to an S3 bucket under the uploads/ prefix. Our durable function picks it up, runs it through three ETL steps (extract, transform, load), then pauses execution and waits for a human to approve the processed data through a shared approval API. Once approved, the function resumes, finalizes the job, and archives the file. The key part? During that 20-minute (or 2-hour, or 2-day) approval wait, you pay absolutely nothing for compute. Architecture Overview The project uses three separate SAM stacks: Markdown shared-resources/ # Approval API, DynamoDB, SNS (shared by both systems) durable-functions/ # Lambda Durable Functions ETL pipeline step-functions/ # Step Functions ETL pipeline (for comparison) The shared approval handler serves for both workflow types using a single API. When a job comes in for approval, it checks the workflowType field, and if it is durable-functions, it calls send_durable_execution_callback_success.If step-functions, it calls send_task_success. Same API endpoint, different callback mechanisms under the hood. Prerequisites Before we begin, make sure you have the following: AWS SAM CLI (latest version recommended)Python 3.14 runtime AWS account with Lambda, DynamoDB, S3, SNS, and API Gateway accessDocker for local Lambda testing Check your SAM CLI version: Markdown sam --version Step 1: Deploy Shared Resources First Before the ETL pipeline, we need the shared infrastructure — the approval API, DynamoDB table for pending approvals, and SNS topic for notifications. Here is the shared-resources/ SAM template: YAML # shared-resources/template.yaml AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Shared resources for ETL approval workflow Parameters: ApproverEmail: Type: String Description: Email address to receive approval notifications Default: [email protected] Resources: PendingApprovalsTable: Type: AWS::DynamoDB::Table Properties: TableName: etl-pending-approvals BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: jobId AttributeType: S KeySchema: - AttributeName: jobId KeyType: HASH TimeToLiveSpecification: AttributeName: ttl Enabled: true ApprovalNotificationTopic: Type: AWS::SNS::Topic Properties: TopicName: etl-approval-notifications Subscription: - Endpoint: !Ref ApproverEmail Protocol: email ApprovalApi: Type: AWS::Serverless::Api Properties: Name: ETL-Approval-API StageName: prod ApprovalHandlerFunction: Type: AWS::Serverless::Function Properties: FunctionName: ETL-Approval-Handler CodeUri: ./src Handler: approval_handler.handler Runtime: python3.14 MemorySize: 256 Timeout: 30 Environment: Variables: APPROVALS_TABLE: !Ref PendingApprovalsTable Policies: - DynamoDBCrudPolicy: TableName: !Ref PendingApprovalsTable - Version: '2012-10-17' Statement: - Effect: Allow Action: - states:SendTaskSuccess - states:SendTaskFailure - lambda:SendDurableExecutionCallbackSuccess - lambda:SendDurableExecutionCallbackFailure Resource: '*' Events: ApproveJob: Type: Api Properties: RestApiId: !Ref ApprovalApi Path: /approve/{jobId} Method: POST RejectJob: Type: Api Properties: RestApiId: !Ref ApprovalApi Path: /reject/{jobId} Method: POST GetJobStatus: Type: Api Properties: RestApiId: !Ref ApprovalApi Path: /status/{jobId} Method: GET Notice the approval handler has permissions for both states:SendTaskSuccess (Step Functions) and lambda:SendDurableExecutionCallbackSuccess (Durable Functions). This is the shared handler approach, one API that works with both workflow types. Deploy it: Markdown cd shared-resources sam build sam deploy --guided Step 2: The Durable Functions SAM Template Now the ETL pipeline itself for the Duration Functions. The key addition is the DurableConfig property. The DurableConfig property tells Lambda to enable durable execution for your function. YAML # durable-functions/template.yaml AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Lambda Durable Functions ETL Pipeline Globals: Function: Runtime: python3.14 Architectures: - arm64 MemorySize: 512 Timeout: 900 Resources: ETLOrchestratorFunction: Type: AWS::Serverless::Function Properties: FunctionName: ETLDurableOrchestrator CodeUri: ./src Handler: handlers/etl_handler.lambda_handler MemorySize: 1024 Timeout: 900 DurableConfig: ExecutionTimeout: 86400 # 24 hours for human approval RetentionPeriodInDays: 14 # Keep execution history for debugging AutoPublishAlias: live Policies: - AWSLambdaBasicExecutionRole - S3CrudPolicy: BucketName: !Sub "${RawBucketName}-${AWS::AccountId}" - S3CrudPolicy: BucketName: !Sub "${ProcessedBucketName}-${AWS::AccountId}" - DynamoDBCrudPolicy: TableName: !Ref ETLMetadataTable - DynamoDBCrudPolicy: TableName: etl-pending-approvals - SNSPublishMessagePolicy: TopicName: etl-approval-notifications Events: S3Upload: Type: S3 Properties: Bucket: !Ref RawDataBucket Events: s3:ObjectCreated:* Filter: S3Key: Rules: - Name: prefix Value: uploads/ - Name: suffix Value: .csv Environment: Variables: PROCESSED_BUCKET: !Sub "${ProcessedBucketName}-${AWS::AccountId}" METADATA_TABLE: !Ref ETLMetadataTable APPROVALS_TABLE: etl-pending-approvals APPROVAL_TOPIC_ARN: !ImportValue ETL-ApprovalTopicArn APPROVAL_API_URL: !ImportValue ETL-ApprovalApiUrl A few things to notice here: MemorySize: 1024 on the orchestrator (overrides the 512 MB global default). Since this single function does all the work, it needs more memory.ExecutionTimeout: 86400 – This is the total workflow duration across all invocations (24 hours). The standard Timeout: 900 is the per-invocation limit (15 minutes). Each checkpoint/resume is a fresh invocation.AutoPublishAlias: live – AWS recommends using Lambda versions with durable functions. If you update code while an execution is suspended, replay will use the version that started the execution.S3 filter with prefix: uploads/ and suffix: .csv – Only CSV files under the uploads/ directory trigger the workflow.The stack imports shared resources via !ImportValue the approval table, SNS topic, and API URL from the shared stack. Step 3: Writing the Durable Function This is where it gets interesting. The entire ETL pipeline, including the approval wait, lives in a single Lambda function. No state machine definition. No JSON DSL. Just Python code. First, the individual ETL steps. Each one is a regular Python function in a separate file: Extract Python import csv import io import boto3 import logging logger = logging.getLogger() s3_client = boto3.client("s3") def extract_data(source_bucket, source_key, step_context=None): logger.info(f"Extracting from s3://{source_bucket}/{source_key}") response = s3_client.get_object(Bucket=source_bucket, Key=source_key) content = response["Body"].read().decode("utf-8") reader = csv.DictReader(io.StringIO(content)) records = list(reader) schema = { "columns": reader.fieldnames, "source_file": source_key, "file_size_bytes": response["ContentLength"] } logger.info(f"Extracted {len(records)} records with {len(schema['columns'])} columns") return {"data": records, "record_count": len(records), "schema": schema} Transform Python import logging from datetime import datetime logger = logging.getLogger() def transform_data(raw_data, schema_config, step_context=None): logger.info(f"Transforming {len(raw_data)} records") valid_records, rejected_records = [], [] for i, record in enumerate(raw_data): try: cleaned = {k: v.strip() if isinstance(v, str) else v for k, v in record.items()} if not cleaned.get("id") or not cleaned.get("name"): rejected_records.append({"index": i, "reason": "Missing required field"}) continue if "date" in cleaned: cleaned["date"] = normalize_date(cleaned["date"]) cleaned["_processed_at"] = datetime.utcnow().isoformat() for key in ["amount", "quantity", "price"]: if key in cleaned and cleaned[key]: try: cleaned[key] = float(cleaned[key]) except ValueError: cleaned[key] = None valid_records.append(cleaned) except Exception as e: rejected_records.append({"index": i, "reason": str(e)}) return { "data": valid_records, "valid_records": len(valid_records), "rejected_records": len(rejected_records), "rejection_details": rejected_records[:100] } def normalize_date(date_str): for fmt in ["%Y-%m-%d", "%m/%d/%Y", "%d-%m-%Y", "%Y/%m/%d"]: try: return datetime.strptime(date_str, fmt).strftime("%Y-%m-%d") except ValueError: continue return date_str Load Python import json import boto3 import logging logger = logging.getLogger() s3_client = boto3.client("s3") def load_data(transformed_data, target_bucket, target_key, step_context=None): logger.info(f"Loading {len(transformed_data)} records to s3://{target_bucket}/{target_key}") output_lines = "\n".join(json.dumps(r) for r in transformed_data) s3_client.put_object( Bucket=target_bucket, Key=target_key, Body=output_lines.encode("utf-8"), ContentType="application/jsonl", Metadata={"record_count": str(len(transformed_data))} ) summary = { "record_count": len(transformed_data), "columns": list(transformed_data[0].keys()) if transformed_data else [], "sample_records": transformed_data[:3] } return {"target_path": f"s3://{target_bucket}/{target_key}", "record_count": len(transformed_data), "summary": summary} Notice the steps are plain Python functions — no special decorator, no SDK import. They take step_context=None as an optional last parameter, which keeps them testable outside the durable execution context. Now the main ETL orchestrator that ties it all together: Python import json import os import logging from datetime import datetime from aws_durable_execution_sdk_python import durable_execution, DurableContext from steps.extract import extract_data from steps.transform import transform_data from steps.load import load_data from steps.finalize import finalize_job logger = logging.getLogger() logger.setLevel(logging.INFO) PROCESSED_BUCKET = os.environ.get("PROCESSED_BUCKET") METADATA_TABLE = os.environ.get("METADATA_TABLE") @durable_execution def lambda_handler(event, context: DurableContext): # Handle both S3 event format and direct invocation if "Records" in event: s3_event = event["Records"][0]["s3"] source_bucket = s3_event["bucket"]["name"] source_key = s3_event["object"]["key"] else: source_bucket = event.get("bucket") source_key = event.get("key") # Generate job_id deterministically using context.step() job_id = context.step( lambda _: f"etl-durable-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-" f"{source_key.split('/')[-1]}", name="generate-job-id" ) context.logger.info(f"Starting ETL job: {job_id}") # Step 1: Extract extracted = context.step( lambda _: extract_data(source_bucket, source_key, None), name="extract-data" ) context.logger.info(f"Extracted {extracted['record_count']} records") # Step 2: Transform transformed = context.step( lambda _: transform_data(extracted["data"], extracted.get("schema", {}), None), name="transform-data" ) # Step 3: Load loaded = context.step( lambda _: load_data(transformed["data"], PROCESSED_BUCKET, f"processed/{job_id}/output.jsonl", None), name="load-data" ) # --- EXECUTION PAUSES HERE --- # The submitter function stores the callback_id in DynamoDB # and sends an SNS notification to the reviewer. # No compute charges while waiting for approval. def submit_for_approval(callback_id: str, ctx): return notify_reviewer(job_id, callback_id, loaded["summary"]) approval = context.wait_for_callback( submitter=submit_for_approval, name="quality-check-approval" ) # Parse approval result if isinstance(approval, str): approval = json.loads(approval) if not approval or not approval.get("approved"): return {"status": "REJECTED", "job_id": job_id, "reason": approval.get("reason", "No reason")} # Step 4: Finalize (only runs after approval) final = context.step( lambda _: finalize_job(job_id, source_bucket, source_key, loaded, approval, METADATA_TABLE, None), name="finalize-job" ) return { "status": "COMPLETED", "job_id": job_id, "records_processed": transformed["valid_records"], "output_path": loaded["target_path"], "approved_by": approval.get("reviewer"), "completed_at": final["completed_at"] } Let me break down the important parts: @durable_execution – This decorator (imported from aws_durable_execution_sdk_python) enables the checkpoint/replay mechanism on the handler.context.step(lambda _: ..., name="...") – Each step call creates a checkpoint. On replay, completed steps return their cached results instantly instead of re-executing.context.wait_for_callback(submitter=..., name="...") – This is the zero-cost waiting magic. The submitter function receives a callback_id which gets stored in DynamoDB. Execution then pauses completely — Lambda saves the state, shuts down, and you stop paying.Determinism matters – Notice job_id is generated inside a context.step(). That is intentional. Since Lambda replays your function from the beginning on resume, datetime.utcnow() would produce a different value on each replay. Wrapping it in a step ensures the timestamp gets checkpointed and replayed consistently. The notify_reviewer function (in the same file) stores the callback details in DynamoDB and sends an SNS notification: Python def notify_reviewer(job_id, callback_id, summary): import boto3 from datetime import timedelta dynamodb = boto3.resource('dynamodb') sns_client = boto3.client('sns') approvals_table = os.environ.get('APPROVALS_TABLE', 'etl-pending-approvals') approval_topic_arn = os.environ.get('APPROVAL_TOPIC_ARN') approval_api_url = os.environ.get('APPROVAL_API_URL') table = dynamodb.Table(approvals_table) ttl = int((datetime.utcnow() + timedelta(hours=24)).timestamp()) table.put_item(Item={ 'jobId': job_id, 'callbackId': callback_id, 'functionArn': os.environ.get('AWS_LAMBDA_FUNCTION_NAME'), 'workflowType': 'durable-functions', 'summary': json.dumps(summary), 'status': 'pending', 'requestedAt': datetime.utcnow().isoformat(), 'ttl': ttl }) if approval_topic_arn: sns_client.publish( TopicArn=approval_topic_arn, Subject=f'ETL Job Approval Required: {job_id}', Message=f"Job ID: {job_id}\n" f"Approve: POST {approval_api_url}/approve/{job_id}\n" f"Reject: POST {approval_api_url}/reject/{job_id}" ) return {"job_id": job_id, "callback_id": callback_id, "status": "pending"} The workflowType: 'durable-functions' field is important — it tells the shared approval handler which callback mechanism to use when the reviewer responds. Step 4: The Shared Approval Handler When the reviewer clicks approve, the shared handler looks up the callbackId from DynamoDB and sends the callback to the paused durable execution: Python # shared-resources/src/approval_handler.py (key excerpt) if workflow_type == 'durable-functions': callback_id = approval_record.get('callbackId') if approved: lambda_client.send_durable_execution_callback_success( CallbackId=callback_id, Result=json.dumps(approval_response) ) else: lambda_client.send_durable_execution_callback_failure( CallbackId=callback_id, Error='JobRejected', Cause=reason or 'Job rejected by reviewer' ) elif workflow_type == 'step-functions': task_token = approval_record.get('taskToken') if approved: stepfunctions.send_task_success( taskToken=task_token, output=json.dumps(approval_response) ) Same API, same reviewer experience — the underlying callback mechanism is the only thing that differs. Step 5: Deploy and Test Deploy in order (shared resources first, since the other stacks import from it): Markdown # 1. Deploy shared resources cd shared-resources sam build && sam deploy --guided # 2. Deploy Durable Functions cd ../durable-functions sam build && sam deploy --guided Generate test data: Markdown python scripts/generate_test_data.py --count 10 --output test-data/ Upload files to trigger the workflow (note the uploads/ prefix — the S3 filter requires it): Markdown aws s3 cp test-data/ s3://etl-raw-data-bucket-YOUR_ACCOUNT_ID/uploads/ --recursive Check approval status and approve: Markdown # Check status curl https://<api-id>.execute-api.us-east-1.amazonaws.com/prod/status/<job-id> # Approve curl -X POST https://<api-id>.execute-api.us-east-1.amazonaws.com/prod/approve/<job-id> \ -H "Content-Type: application/json" \ -d '{"reviewer": "harpreet", "reason": "Data looks good"}' For bulk approvals during testing, the repo includes a handy script: Markdown ./scripts/approve_all_jobs.sh For local testing, the testing SDK supports pytest: Markdown pip install aws-lambda-durable-execution-sdk-testing pytest durable-functions/tests/ Step 6 (Optional): Deploy Step Functions for Comparison If you want to reproduce my full comparison, deploy the Step Functions stack too: Markdown cd step-functions sam build && sam deploy --guided Here is what the same workflow looks like in Amazon States Language: JSON { "StartAt": "ExtractData", "States": { "ExtractData": { "Type": "Task", "Resource": "${ExtractFunctionArn}", "ResultPath": "$.extractResult", "Next": "TransformData" }, "TransformData": { "Type": "Task", "Resource": "${TransformFunctionArn}", "ResultPath": "$.transformResult", "Next": "LoadData" }, "LoadData": { "Type": "Task", "Resource": "${LoadFunctionArn}", "ResultPath": "$.loadResult", "Next": "WaitForApproval" }, "WaitForApproval": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken", "Parameters": { "FunctionName": "${ApprovalFunctionArn}", "Payload": { "taskToken.$": "$$.Task.Token", "jobId.$": "$.loadResult.job_id", "summary.$": "$.loadResult.summary" } }, "TimeoutSeconds": 86400, "ResultPath": "$.approvalResult", "Next": "CheckApproval" }, "CheckApproval": { "Type": "Choice", "Choices": [{ "Variable": "$.approvalResult.approved", "BooleanEquals": true, "Next": "FinalizeJob" }], "Default": "JobRejected" }, "JobRejected": { "Type": "Pass", "Result": { "status": "REJECTED" }, "End": true }, "FinalizeJob": { "Type": "Task", "Resource": "${FinalizeFunctionArn}", "End": true } } } Compare the two approaches. Durable Functions: one Python file, one Lambda, familiar programming constructs. Step Functions: a JSON state machine definition, five separate Lambda functions, plus the ASL learning curve. Both do the same thing. The Real Cost Numbers Now, here is the part that made me rebuild a mental model I had about serverless orchestration costs. I ran 1,000 CSV files through this exact workflow — both with Durable Functions and with the Step Functions implementation. The approval wait averaged about 20 minutes per document. Cost ComponentDurable FunctionsStep FunctionsDifferenceLambda invocations$0.000358$0.001-64%Lambda duration$0.0308$0.0179+72%State transitions$0.000$0.175-100%DynamoDB$0.003$0.0030%S3 operations$0.010$0.0100%TOTAL$0.044$0.207-79% Source: AWS CloudWatch Metrics The total cost, which is 79% cheaper, is mainly driven almost entirely by one thing: state transitions. Step Functions charges $0.025 per 1,000 state transitions. ASL workflow has 7 states (ExtractData, TransformData, LoadData, WaitForApproval, CheckApproval, JobRejected/FinalizeJob). For 1,000 workflows, that is 7,000 transitions, which costs $0.175. That single line (state transition) item is 84% of the total Step Functions cost. Durable Functions eliminates state transition costs. The trade-off? Higher Lambda duration costs ($0.031 vs. $0.018) because the durable function runs with 1,024 MB memory (single function handling all work) compared to Step Functions using 512 MB per function across five smaller functions. At scale, the difference adds up quickly: Daily VolumeDurable Functions/yearStep Functions/yearAnnual Savings1,000/day$16.06$75.56$59.5010,000/day$160.60$755.60$595100,000/day$1,606$7,556$5,950 And the most important validation: both systems achieved $0 compute cost during the 20-minute approval wait. That is the real game-changer compared to polling or always-on servers. Understanding the Replay Model One thing that confused me initially was the invocation count. I expected 1,000 invocations for 1,000 workflows. Instead, I got 1,788. Here is why. The checkpoint/replay model means each workflow requires a minimum of 2 invocations: Initial invocation — S3 trigger fires, function runs generate-job-id → extract → transform → load → submit-for-approval → pauseResume invocation — Callback received, function replays from the beginning (all completed steps return cached results instantly), then executes the finalize step So the theoretical minimum is 2,000 invocations for 1,000 workflows. The actual number was 1,788 because some workflows were still pending approval when I collected the metrics over the 24-hour measurement window. The important thing to remember: your code must be deterministic. Since Lambda replays your function from the beginning on resume, any non-deterministic operations (random numbers, timestamps, external API calls) must happen inside context.step() blocks so their results get checkpointed. Python job_id = context.step( lambda _: f"etl-durable-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-" f"{source_key.split('/')[-1]}", name="generate-job-id" ) That is exactly why the job_id generation in our code uses context.step().Without it, the timestamp would change on every replay. Here are some other examples where your code must be deterministic and how to avoid that: Deterministic IsssuesWhy It BreaksSolutionMath.random()Different value on every replayWrap in context.step()Date.now()Time keeps moving forwardUse context.timestamp or wrap in a stepGlobal variablesMight change between replaysPass state through function argumentsExternal API callsNetwork is a lieAlways wrap in context.step()Iterating over Map or SetIteration order can vary by runtimeUse arrays or ensure stable ordering When Not to Use Durable Functions I want to be honest about the trade-offs, because this is not a "Durable Functions is better than everything" story. Choose Step Functions when: Visual debugging matters. The step function state machine execution graph is genuinely superior. You can see exactly which step failed, inspect the input/output of each state, and non-technical stakeholders can actually understand what the workflow is doing. With Durable functions, AWS did provide visual analysis, monitoring, and debugging as well, but its little more developer-friendly. Multi-service orchestration. Step Functions has 220+ native AWS service integrations. DynamoDB, SQS, SNS, ECS, and Glue without writing Lambda glue code. In our Step Functions implementation, the ASL connects directly to Lambda function ARNs with built-in retry policies. With Durable Functions, all integrations go through your Lambda code.Express Workflows apply. For short-duration (under 5 minutes), high-throughput workflows, Step Functions Express Workflows use a different pricing model that can be very competitive. Choose Durable Functions when: Cost optimization is the priority (79% savings at scale)Workflows are Lambda-centric (your logic lives in Lambda code anyway)You prefer writing orchestration in Python/TypeScript/over Amazon States Language. AWS just now released Lambda Duration functions with Java in developer preview.Your logic is complex, and dynamic programming language is preferred by developers over the declarative ASL. AWS recommends a hybrid approach: use Durable Functions for application-level logic within Lambda, and Step Functions for high-level orchestration across multiple AWS services. Concurrency Planning — A Quick Note One thing worth mentioning: Durable Functions consolidates your entire workflow into a single Lambda function (ETLDurableOrchestrator in our case). This means your Lambda concurrency quota directly limits how many workflows can run simultaneously. Step Functions distributes execution across five separate Lambda functions (Extract, Transform, Load, Approval, Finalize), spreading the concurrency demand. In practice, this means you should plan your Lambda concurrency quotas carefully when using Durable Functions. If you expect burst uploads of hundreds or thousands of files at once, set reserved concurrency appropriately for your workload. This applies to both services — the difference is just where the concurrency demand concentrates. Wrapping Up Lambda Durable Functions is a genuinely useful addition to the serverless toolkit. For a simple ETL pipeline with human-in-the-loop approval, it delivered 79% cost savings over Step Functions while achieving the same 100% success rate and zero-cost waiting. The code-first approach feels natural if you are already comfortable writing Lambda functions in Python, TypeScript, or Java. The wait_for_callback pattern for human approvals is clean and straightforward. And the cost savings are real, which is driven entirely by the elimination of state transition charges. That said, Step Functions remains the better choice when visual workflow representation, multi-service orchestration, or operational simplicity are your priorities. There is no universal winner here, and it depends on what your team values more. The complete implementation — both SAM stacks, shared approval infrastructure, test data generation scripts, bulk approval scripts, and detailed cost analysis — is available here: github.com/hsiddhu2/aws-lambda-durable-vs-stepfunctions. Clone it, deploy both implementations, run your own 1,000-file comparison, and see the numbers for yourself. The ~79% cost advantage held consistent for this workflow, but your number will vary based on workflow complexity and state count.

By Harpreet Siddhu
From AI Chaos to Control: Building Enterprise-Grade LLM Gateways With MuleSoft Anypoint
From AI Chaos to Control: Building Enterprise-Grade LLM Gateways With MuleSoft Anypoint

Artificial intelligence is no longer experimental. It has become an important part of how organizations are building AI agents and applications. From chatbots to autonomous systems, companies are rapidly integrating large language models (LLMs) into their workflows to improve efficiency, automate tasks, and enhance user experiences. However, as adoption grows, so does the complexity around managing these systems. Challenges in Scaling LLM Usage When organizations start using multiple LLMs across different teams and applications, several challenges naturally emerge: Lack of visibility into how models are being usedMultiple LLM providers used independently by different teamsGovernance and compliance challengesDifficulty in coordinating multiple AI agentsRising and unpredictable operational costsSecurity and data privacy risks In many cases, different teams begin integrating their own preferred LLMs without a shared strategy. While this approach may work initially, it quickly becomes difficult to manage at scale. This situation often leads to what can be described as AI sprawl, where systems become fragmented, inconsistent, and harder to govern centrally. Over time, this affects not only cost and security but also the overall reliability of AI-driven applications. The Problem: AI at Scale Becomes Hard to Manage When teams directly connect their applications to different LLM providers, a few common issues appear: Each team may choose a different LLM provider based on preference or convenienceCosts become difficult to track across departmentsSensitive or regulated data may be exposed if proper controls are missingThere is no standard governance model across the organizationEach team builds and maintains its own integration logic This leads to duplicated effort, inconsistent implementations, and limited visibility for platform or IT teams responsible for oversight. As a result, organizations struggle to maintain control over their AI ecosystem while still trying to innovate quickly. Introducing a Centralized Approach: AI Gateway To address these challenges, MuleSoft introduces the concept of an AI Gateway as part of the Anypoint Platform. The AI Gateway acts as a centralized control layer for all LLM requests. Instead of applications connecting directly to different LLM providers, they communicate through a single entry point. This gateway helps in: Routing requests to appropriate modelsApplying security and governance policiesTracking usage and cost across teamsMaintaining consistency in AI interactions In simpler terms, it works like a traffic controller for AI requests, ensuring that everything flows in a controlled, secure, and observable manner. Key Capabilities of AI Gateway 1. Unified Access to Multiple LLMs One of the main advantages of an AI Gateway is that it provides a single unified endpoint for accessing multiple LLM providers. Instead of integrating each model separately, developers can connect once and access different models through the same interface. This also makes it easier to switch between providers if needed, without major changes to application logic. 2. Intelligent Routing of Requests Not all AI requests are the same. Some are simple and can be handled by smaller, cost-effective models, while others require more advanced reasoning capabilities. AI Gateway supports intelligent routing based on factors such as: Complexity of the requestCost considerationsPerformance requirements There are generally two types of routing approaches: Model-Based Routing (Static Routing) In this approach, a specific model is assigned to handle a particular type of request. This is useful when the requirements are well-defined. Semantic Routing (Dynamic Routing) Here, the system analyzes the request and automatically decides which model is best suited. This may involve prompt classification or intent detection to improve accuracy and efficiency. This flexibility helps organizations balance performance and cost more effectively. 3. Cost Control and Visibility One of the major concerns with LLM usage is unpredictable cost. Since most models are billed based on token usage, expenses can grow quickly if not monitored properly. AI Gateway provides visibility into: Token usage per applicationUsage across teams and departmentsOverall consumption trends This helps organizations set budgets, monitor spending, and avoid unexpected cost spikes. 4. Built-in Governance and Security Security is a critical aspect when dealing with AI systems, especially when sensitive or regulated data is involved. AI Gateway helps enforce policies such as: Token limits to control usageAuthentication and authorization mechanismsData protection measures like PII masking These controls ensure that data is handled safely and that only authorized requests are processed. It also helps reduce the risk of exposing sensitive information to external systems. 5. Monitoring and Observability Another important capability is visibility into how AI systems are performing in real time. Through the Anypoint Platform, organizations can access: Detailed logs for every requestUsage analytics and trendsDebugging information for troubleshootingCompliance and audit support This level of observability is essential for maintaining reliability in production environments, especially when AI is used in business-critical workflows. What Is an LLM Proxy? An LLM Proxy is a unified interface layer that sits between applications and LLM providers. Instead of integrating directly with multiple APIs, developers interact with a single endpoint, and the proxy handles routing, security, and policy enforcement behind the scenes. This abstraction simplifies development and reduces the complexity of managing multiple integrations. How an LLM Proxy Works A typical flow looks like this: An application or AI agent sends a request to the LLM Proxy endpointThe proxy (running on a gateway such as Flex Gateway) receives the requestPolicies such as authentication, token limits, and data masking are appliedThe request is routed to the most appropriate LLM providerThe response is returned back to the application All of these steps are managed centrally through the platform, which reduces the need for custom logic in each application. High-Level Concept At a high level, the architecture can be thought of as a control layer sitting between users and multiple LLM providers. Instead of point-to-point integrations, all traffic flows through a single managed layer. This simplifies both operational management and long-term scalability. High-Level Architecture LLM Proxy Architecture Conclusion AI adoption is growing rapidly, but managing it across large organizations introduces new challenges. Without proper structure, teams can end up with fragmented integrations, rising costs, and limited visibility into how AI is being used. A centralized approach using an AI Gateway and LLM proxy helps address these issues by: Providing a unified access layer for AI modelsEnforcing governance and security policies consistentlyImproving visibility into usage and costsSupporting scalable and controlled AI adoption This enables organizations to move from isolated AI experiments toward more structured, enterprise-ready AI systems. Take control of your AI ecosystem with MuleSoft AI Gateway, simplifying, securing, and scaling every interaction through the Anypoint Platform.

By Jitendra Bafna

Top Integration Experts

expert thumbnail

John Vester

Senior Staff Engineer,
Marqeta

IT professional with 30+ years expertise in app design and architecture, feature development, and project and team management. Currently focusing on establishing resilient cloud-based services running across multiple regions and zones. Additional expertise architecting (Spring Boot) Java and .NET APIs against leading client frameworks, CRM design, and Salesforce integration.
expert thumbnail

Thomas Jardinet

IT Architect,
Rhapsodies Conseil

As an IT Architect with strong experience in Integration topics (with multiple contributions for Dzone Tech and Ref Cards), I accompany business projects in defining their architectures, whether functional, application or technical, by studying with them the best path. I also have more than I also accompany them in the organizational side, and above all I seek intellectual and human exchange. I am also a supporter of flattened organizations, as I think it greatly improves productivity, robustness, and resilience of companies

The Latest Integration Topics

article thumbnail
Spring AI Advisors: Chat Memory, Token Tracking, and Message Logging
Part 3 of a step-by-step tutorial that decorates the implementation with Spring AI advisors to demonstrate how certain production concerns may be addressed.
June 5, 2026
by Horatiu Dan DZone Core CORE
· 847 Views
article thumbnail
Advanced Error Handling and Retry Patterns in Enterprise REST Integrations
Blind retries amplify outages fast. Classify failures first, jitter your backoff, and circuit-break early before cascading.
June 4, 2026
by Anil guntupalli
· 1,199 Views
article thumbnail
Beyond Manual Annotation: Engineering Self-Correcting Pseudo-Labeling Pipelines
This article details a resilient pseudo-labeling architecture. It combines Redis ingestion, Matryoshka embeddings, XGBoost to neutralize self-training confirmation bias.
June 4, 2026
by Harshith Narasimhan Srivatsa
· 1,068 Views
article thumbnail
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 2
Build a Slack bot using AWS Bedrock and MCP to answer GitHub questions. Learn setup, architecture, and how to extend it with new tools and data sources.
June 4, 2026
by Sangharsh Agarwal
· 1,129 Views
article thumbnail
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
STIX/TAXII in, ECS normalized, provenance preserved deterministic IDs, correct bulk writes, ingest pipelines keep threat indicator data reliable and queryable under load.
June 3, 2026
by Krishnaveni Musku
· 1,707 Views
article thumbnail
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1
Building a Slack bot with traditional APIs led to 400 lines of code. Using MCP and AWS Bedrock reduced complexity, enabling scalable, tool-driven automation.
June 3, 2026
by Sangharsh Agarwal
· 1,480 Views · 1 Like
article thumbnail
Securing the AI Host: Spring AI MCP Server Communication With API Keys
Part 2 of a step-by-step tutorial that integrates an AI assistant with two dedicated MCP servers and secures the communication.
June 3, 2026
by Horatiu Dan DZone Core CORE
· 1,597 Views · 2 Likes
article thumbnail
MuleSoft MCP and A2A in Production: What 17 Recipes Reveal
MuleSoft MCP and A2A shipped in 2025. Zero practitioner guides exist beyond basic setup. 17 recipes reveal the implementation ladder teams are missing.
June 3, 2026
by Balachandra Shakar Bisetty
· 844 Views
article thumbnail
MuleSoft IDP: Enhancing Efficiency and Accuracy in Data Extraction
MuleSoft IDP uses AI to extract and structure data from documents like invoices and PDFs, helping automate workflows, reduce errors, and improve processing speed.
June 1, 2026
by Jitendra Bafna
· 1,128 Views
article thumbnail
A Hands-On ABAP RESTful Programming Model Guide
BAPIs are legacy; replace them with RAP-based APIs and EML in S/4HANA 2022 for cleaner, cloud-ready, upgrade-safe ABAP that SAP actually maintains.
June 1, 2026
by Deepika Paturu
· 703 Views
article thumbnail
Implementing Secure API Gateways for Microservices Architecture
Use Kong as an API gateway to centralize JWT auth, rate limiting, and access control across all microservices, keeping individual services focused on business logic.
May 29, 2026
by Mugunth Chandran
· 3,536 Views · 3 Likes
article thumbnail
Event-Driven Pipelines With Apache Pulsar and Go
Build scalable, real-time pipelines with Apache Pulsar and Go using event-driven producers and consumers that communicate via Pulsar topics.
May 29, 2026
by Shivi Kashyap
· 2,491 Views
article thumbnail
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
Define API, event, and DB contracts upfront to enable parallel development, catch breaking changes in CI, and maintain consistent, reliable integrations.
May 29, 2026
by Wallace Espindola
· 2,084 Views
article thumbnail
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions
Learn how to build an ETL pipeline with human-in-the-loop approval that costs nothing while waiting — and see real cost data from processing 1,000 documents.
May 28, 2026
by Harpreet Siddhu
· 3,570 Views
article thumbnail
From AI Chaos to Control: Building Enterprise-Grade LLM Gateways With MuleSoft Anypoint
This article explains how an AI Gateway centralizes LLM access, enabling secure routing, governance, cost control, and visibility for scalable AI adoption.
May 28, 2026
by Jitendra Bafna
· 1,996 Views
article thumbnail
Programmatic Brand Extraction: Pulling Logos, Colors, and Assets from Any URL
OpenBrand is an open-source library that extracts structured brand assets from any URL - available as an npm package, API, or AI agent skill.
May 27, 2026
by Yixing Jiang
· 1,958 Views
article thumbnail
You Don't Get to Retrofit Trust: Why API Security Must Be Designed In, Not Bolted On
A field-level examination of how one startup got it right — and what the rest of the industry keeps getting catastrophically wrong.
May 27, 2026
by Igboanugo David Ugochukwu DZone Core CORE
· 3,068 Views
article thumbnail
Designing API-First EMR Architectures in .NET: Enabling Modular Growth in Compliance-Driven Systems
API-first .NET architecture lets EMR platforms evolve safely — enforcing compliance, stabilizing contracts, and isolating UI changes from critical business logic.
May 26, 2026
by Ronak Pavasiya
· 1,324 Views
article thumbnail
How Retry Storms Crash API-Led Systems: Bounded Reliability Patterns for Distributed Architectures
Unbounded retries and autoscaling can turn minor latency into cascading outages. API reliability must be bounded and load-aware to prevent retry storms.
May 22, 2026
by Manjeera Chanda
· 2,070 Views
article thumbnail
Self-Hosted Inference Doesn’t Have to Be a Nightmare: How to Use GPUStack
GPUStack is an open-source tool that turns a bunch of scattered GPU machines into one managed cluster for deploying AI models behind an OpenAI-compatible API.
May 21, 2026
by Sandeep Sadarangani
· 3,464 Views
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • ...
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×