How to Save Money Using Custom LLMs for Specific Tasks
Building AI-Powered Java Applications With Jakarta EE and LangChain4j
Platform Engineering and DevOps
Platform engineering and DevOps are merging as organizations scale, modernize, and push to reduce cognitive load across increasingly complex systems. What began as fragmented internal tooling has evolved into Platform-as-a-Product thinking, where internal developer platforms (IDPs), automation pipelines, and golden paths provide the backbone of modern DevOps workflows. Platform teams, DevOps engineers, security teams, and SREs are now working together to deliver consistent, secure, and self-service experiences that improve developer productivity and satisfaction and reinforce operational reliability.This report examines how platform engineering is reshaping DevOps by standardizing environments, unifying toolchains, and shifting repetitive tasks into automated workflows. We explore how teams are implementing developer experience (DevEx) metrics, rethinking CI/CD pipelines, and leveraging AI-driven automation to optimize infrastructure performance and enhance delivery velocity. As enterprises link platform health to business outcomes, measuring ROI and platform adoption is becoming a core initiative.
Shipping Production-Grade AI Agents
Threat Modeling Core Practices
Threat intelligence becomes operationally valuable when indicator data can be collected continuously, normalized into a consistent schema, and queried fast enough to support enrichment and detection workflows. Standardized exchange formats such as STIX and transport protocols such as TAXII exist specifically to make machine-readable cyber threat intelligence easier to distribute at scale, while preserving enough structure for downstream correlation and context. Operational Requirements That Shape Intelligence Pipelines A threat intelligence pipeline is best treated as data engineering with security-specific constraints: provenance must remain intact, updates and revocations must be representable, and “freshness” should be measurable rather than assumed. STIX is explicitly designed to model cyber threat intelligence using typed objects with attributes, and it supports building richer context by linking objects through relationships rather than shipping flat indicator lists. A practical pipeline design often separates raw ingestion from normalized storage. Raw ingestion preserves the original feed payload for auditability and reversibility, while normalized storage produces documents that are easy to match against telemetry. This split aligns with STIX’s modeling approach, where producers may publish Indicators expressed as STIX patterns and connect them to other objects through relationship constructs, enabling consumers to choose between lightweight atom extraction for matching and graph-style context for analysis. Pulling From TAXII and Other APIs Without Losing Provenance TAXII 2.1, published by OASIS Open, defines a RESTful API and related requirements for TAXII clients and servers to exchange cyber threat information in a scalable manner, with STIX 2.1 support described as mandatory to implement in the TAXII context. The IANA media type registration for application/taxii+json also documents that the older application/vnd.oasis.taxii+json name is a deprecated alias, which matters in real integrations because content negotiation and strict header validation vary by server implementation. TAXII 2.1 also formalized mechanics that directly affect pipeline correctness under load. The CTI documentation notes that TAXII 2.1 added limit and next URL parameters and updated content negotiation and media types, reflecting a move toward pagination patterns that can handle large or rapidly changing datasets more safely than item-based offset pagination. A Python pipeline can either implement paging logic directly or delegate it to a client library; the taxii2client project documents a TAXII 2.1 client API that uses application/taxii+json;version=2.1 for Accept handling and provides an as_pages helper for TAXII 2.1 endpoints that support pagination, including “Get Objects” and “Get Manifest.” Python def iter_taxii_objects(collection, cursor, page_size=2000): accept = "application/taxii+json;version=2.1" for page in as_pages(collection.get_objects, per_request=page_size, added_after=cursor, accept=accept): envelope = page if isinstance(page, dict) else page.json() for obj in envelope.get("objects", []): yield obj This pattern avoids embedding server-specific pagination tokens into pipeline logic while still enabling incremental collection reads. The cursor argument can be persisted as an ISO-8601 timestamp when the upstream provides a timestamp filter, a model commonly used by TAXII-feed vendors; for example, ESET documents STIX 2.1 feeds delivered via TAXII 2.1 collections and describes an added_after filter parameter for retrieving objects added after a specified timestamp, alongside retention constraints that make incremental pulls operationally necessary. Not all threat intelligence sources are TAXII-first. MISP Project documentation describes a REST-accessible STIX export capability and explicitly notes that STIX XML export can be slow and lead to timeouts with large events or collections, while STIX JSON avoids that regime, making JSON a more stable transport choice for high-volume automation. The same ecosystem provides a published OpenAPI specification and a dedicated converter library, misp-stix, which supports bidirectional conversion across STIX versions, including STIX 2.1, and includes features such as pattern parsing and indicator-observable fingerprinting, reducing the cost of maintaining bespoke mapping logic for every upstream source. Normalization Into ECS and STIX-Aware Semantics Normalization is where a pipeline either becomes queryable or becomes another archive. The Elastic Common Schema (ECS) threat field guidance explicitly frames threat.* as the mapping layer that normalizes threat intelligence indicators from many structures into consistent fields, and it links that normalization to detection and enrichment workflows such as indicator match rules. In particular, the guidance calls out normalizing indicators into threat.indicator.* so that disparate feeds can be queried consistently and used to build indicator matching logic without treating every provider as a special case. Atomic indicators benefit from being stored both as “typed value” and as vendor identifiers. ECS defines threat.indicator.type values aligned with cyber observable types and documents threat.indicator.id as a place to store indicator IDs, noting that a STIX 2.x indicator ID is a common approach and that the field can hold multiple values to represent the same indicator across systems. The practical implication is that a pipeline can preserve the upstream STIX identifier, attach a stable provider-local identifier when necessary, and still normalize the matchable indicator value into fields such as threat.indicator.ip or other threat.indicator.* subfields. Python def stix_confidence_to_nlmh(value): if value is None: return "Not Specified" v = int(value) if v == 0: return "None" if 1 <= v <= 29: return "Low" if 30 <= v <= 69: return "Medium" if 70 <= v <= 100: return "High" return "Not Specified" def extract_atomic_from_pattern(pattern): p = (pattern or "").strip() if "ipv4-addr:value" in p and "'" in p: return ("ipv4-addr", p.split("'")[1]) if "domain-name:value" in p and "'" in p: return ("domain-name", p.split("'")[1]) if "url:value" in p and "'" in p: return ("url", p.split("'")[1]) return (None, None) def stix_indicator_to_ecs(indicator_obj, provider, fetched_at_iso): itype, ivalue = extract_atomic_from_pattern(indicator_obj.get("pattern")) if not itype: return None doc = { "@timestamp": fetched_at_iso, "event": {"kind": "enrichment", "category": ["threat"], "type": ["indicator"]}, "threat": { "indicator": { "type": itype, "provider": provider, "name": indicator_obj.get("name") or ivalue, "description": indicator_obj.get("description"), "confidence": stix_confidence_to_nlmh(indicator_obj.get("confidence")), "reference": indicator_obj.get("id"), "id": [indicator_obj.get("id")], } }, "labels": {"feed": provider}, } if itype in {"ipv4-addr", "ipv6-addr"}: doc["threat"]["indicator"]["ip"] = ivalue return doc The extraction logic deliberately scopes itself to common “atomic” patterns to keep parsing deterministic and to minimize the risk of silently incorrect field derivation. This constraint matches the operational intent of ECS indicator guidance, which emphasizes consistent querying and reuse for indicator match rules after normalization, rather than attempting to fully interpret every possible composite STIX pattern in real time. Indexing Strategy in Elasticsearch That Avoids Accidental Cost Explosion Elasticsearch storage decisions are not purely operational preferences because they alter what update patterns are safe. Data streams consist of one or more hidden backing indices and require a matching index template; every document indexed into a data stream must include an @timestamp field mapped as a date-type (or date_nanos). Data streams are described as a good fit for most time-series use cases, while the documentation explicitly flags that frequent reuse of the same _id expecting last-write-wins can indicate a better fit for an index alias with a write index rather than a data stream. Threat intelligence pipelines often straddle that boundary: indicator state changes and revocations benefit from upsert semantics, while ingestion audits benefit from append-only history. Retention should be tied to query strategy. Elastic Security documentation warns that indicator match rules can consume significant resources and recommends limiting the indicator index query time range to the minimum necessary for coverage, with a default example query of the past 30 days. Even outside an alerting engine, a time-bounded indicator set tends to be operationally safer: it reduces scan cost, makes cache behavior more predictable, and avoids matching against long-expired infrastructure that is no longer relevant. When vendor retention is narrower, such as the 14-day retention window described for some TAXII feeds, the pipeline should persist that constraint as a policy and avoid relying on “full historical replay” as a recovery mechanism. Ingestion-Time Guardrails With Python, Ingest Pipelines, and Bulk Writes Ingest pipelines provide an explicit place to enforce normalization rules at ingest time. Elastic documentation describes ingest pipelines as a sequence of processors that run sequentially to transform data before it is indexed into a data stream or index, supporting operations such as removal, extraction, and enrichment. In addition, ingest processors can access ingest metadata under the _ingest key, and Elasticsearch notes that pipelines create _ingest.timestamp by default and that indexing ingest metadata requires explicitly setting it via a processor. JSON PUT /_ingest/pipeline/ti_normalize { "description": "Normalize threat intel indicators into ECS threat.indicator.*", "processors": [ { "set": { "field": "event.kind", "value": "enrichment" } }, { "set": { "field": "event.category", "value": ["threat"] } }, { "set": { "field": "event.type", "value": ["indicator"] } }, { "set": { "field": "event.ingested", "value": "{{{_ingest.timestamp}}" } }, { "fingerprint": { "fields": ["threat.indicator.provider", "threat.indicator.type", "threat.indicator.ip"], "target_field": "threat.indicator.fingerprint", "method": "SHA-256", "ignore_missing": true } } ] } Bulk ingestion should align with Elasticsearch’s wire format rules. The bulk API documentation describes NDJSON requirements, including that the final line must end with a newline character and that JSON actions and sources should not be pretty printed because newlines are literal delimiters. A Python producer can serialize documents into bulk batches, assign a deterministic _id derived from provider and atomic indicator value to make writes idempotent, and optionally route documents through the normalization pipeline configured above. Python def build_indicator_id(provider, itype, ivalue): return (provider + ":" + itype + ":" + ivalue).lower() def bulk_index_indicators(es_http, index_name, docs): lines = [] for d in docs: ti = d.get("threat", {}).get("indicator", {}) doc_id = build_indicator_id(ti.get("provider", "unknown"), ti.get("type", "unknown"), ti.get("ip", ti.get("name", "unknown"))) lines.append(encode_json({"index": {"_index": index_name, "_id": doc_id, "pipeline": "ti_normalize"})) lines.append(encode_json(d)) payload = "\n".join(lines) + "\n" return es_http.post("/_bulk", body=payload, headers={"Content-Type": "application/x-ndjson"}) The NDJSON newline termination is not optional, so building the payload in a way that always emits a trailing newline avoids a class of partial-ingest failures that are hard to diagnose under load. For enrichment use cases, ingest-time join behavior should be applied cautiously: Elastic warns that the enrich processor can impact ingest speed, recommends benchmarking, and explicitly states that it is not recommended for appending real-time data, instead working best with reference data that does not change frequently. This guidance aligns with threat intelligence practice: fast-changing indicators typically work better as a queried dataset, joined at search or detection time, rather than as an ingest-time enrichment applied to every event. Conclusion A threat intelligence pipeline built on Python, APIs, and Elasticsearch becomes reliable when it treats schemas, media types, and update semantics as core engineering concerns instead of integration details. STIX and TAXII provide standard object modeling and transport expectations, including content negotiation and pagination mechanics, while ECS provides a target schema that makes indicators consistently queryable and directly usable by matching workflows such as indicator match rules. High-quality implementations preserve provenance, normalize into threat.indicator.* with STIX-aligned confidence semantics, choose an indexing strategy that matches expected update patterns, and enforce ingestion guardrails through ingest pipelines, simulation, and NDJSON-correct bulk writes.
What This Series Is About This is Part 2 of a two-part series on building a Slack bot that answers natural language questions about a GitHub repository using AWS Bedrock (Claude) and GitHub's official Model Context Protocol (MCP) server. Part 1 covered the why: most AI tools suggest wrapping GitHub's REST API and feeding the response to a model. That approach works, but it produces brittle glue code that grows with every new question type and every new data source. MCP offers a fundamentally better pattern — a tool registry that the model queries at runtime, making routing decisions autonomously. The result is a 150-line bot that answers questions you never anticipated and extends to new data sources with four lines of configuration. If you have not read Part 1, it is available here: https://dzone.com/articles/build-a-github-slack-bot-with-aws-bedrock-and-mcp. The full project code is on GitHub: https://github.com/sangharshcs/slack-github-mcp-bot. This article covers the implementation — the four key architectural pieces, how to get it running, how to extend it to new MCP servers, and the production lessons from running it on a real engineering team. How It Is Built — The Four Key Pieces The bot has four distinct components. Understanding each one separately makes the whole system easier to reason about and extend. 1. The MCP Request Function All communication with GitHub's MCP server goes through a single function. GitHub MCP returns Server-Sent Events (SSE) rather than plain JSON, so the function handles both response types transparently. It also checks HTTP status and surfaces MCP-level errors cleanly — without this, a 401 or 500 from the server fails silently. The function signature accepts the endpoint and headers as parameters, not hardcoded values. This is the detail that makes the whole system extensible: the same function routes to GitHub today and to any other MCP server tomorrow. 2. The Tool Registry At startup, the bot calls tools/list on every connected MCP server and records which server owns each tool. This registry — a simple JavaScript object mapping tool name to endpoint and auth headers — is the entire routing mechanism. When Claude calls a tool, the bot looks up its origin and sends the request there. Adding a new MCP server means calling the same loadServer() function with the new URL and credentials. The registry grows. The agent loop never changes. This four-line pattern is the extensibility mechanism Eric described as worth expanding on: JavaScript // Same pattern for every MCP server you add: const myServiceHeaders = { Authorization: `Bearer ${process.env.MY_SERVICE_TOKEN}`, 'Content-Type': 'application/json', Accept: 'application/json, text/event-stream', }; await loadServer(process.env.MY_SERVICE_MCP_URL, myServiceHeaders); // Then add routing guidance to your system prompt. // The agent loop below does not change. 3. The Agent Loop The loop sends the question to Claude with the full tool list. Claude selects tools, the bot executes them via the registry, results return to Claude, and the cycle repeats until Claude has enough to answer — typically 3 to 8 tool calls. The loop is generic: it does not know whether it is answering a bug or a PR question. The system prompt configures the behavior. The same code handles every question type, present and future. 4. The System Prompt The system prompt is the highest-leverage piece in the entire system. The difference between a bot your team uses daily and one they quietly stop using is almost always prompt quality, not code quality. Three rules matter most: Explicit Slack markdown syntax. Claude defaults to standard Markdown. Without being told otherwise, it uses **bold** and [text](url), which Slack renders as raw asterisks and broken links. The prompt must specify *bold*, <url|text>, no # headings, no markdown tables.High-volume handling. Without a rule, asking 'list all open issues' on a large repo returns an unusable wall of text. The prompt should specify: if results exceed 15 items, summarise by category and show the top 10.Tool routing for multiple servers. When you add a second MCP server, the prompt tells Claude which questions map to which server. This reduces unnecessary tool calls and keeps responses fast.The complete index.js, package.json, and .env template are in the project repository at https://github.com/sangharshcs/slack-github-mcp-bot. Getting It Running Setup involves three external services — Slack, GitHub, and AWS Bedrock — each requiring a token. Rather than reproducing the full step-by-step here (that lives in the project README at https://github.com/sangharshcs/slack-github-mcp-bot), here is what each token is and where to get it. The Slack bot token (xoxb-...) comes from creating a Slack app at api.slack.com/apps with Socket Mode enabled. Socket Mode is what lets the bot run from your laptop without a public URL — it connects outbound over WebSocket. You also need an App-Level Token (xapp-...) for the socket connection itself, and a Signing Secret from Basic Information. The bot needs these scopes: app_mentions:read, chat:write, im:history, im:write.The GitHub token is a fine-grained personal access token from github.com/settings/tokens. Select your target repository and grant read access to Issues, Pull Requests, Contents, and Metadata. No write access is needed.The Bedrock API key comes from the AWS console under Amazon Bedrock → API keys. Enable the Claude Sonnet 4.6 model under Model access first. One detail that catches everyone: Claude 4.x models require a cross-region inference profile prefix. Use us.anthropic.claude-sonnet-4-6, not anthropic.claude-sonnet-4-6. The bare ID returns "on-demand throughput isn't supported". With credentials in .env, npm install and node index.js is all it takes. The bot logs the number of GitHub tools loaded and is ready to receive mentions. Extending to Other MCP Servers loadServer() is the entire extension mechanism. Call it with any MCP-compatible service. The registry grows, Claude discovers the new tools, and you add one line to the system prompt describing when to use them. MCP Server URL What it adds Linear mcp.linear.app/mcp Issues, projects, cycles, roadmaps Cloudflare mcp.cloudflare.com/mcp Workers, analytics, DNS, R2 storage Stripe mcp.stripe.com/mcp Payments, customers, subscriptions Custom @modelcontextprotocol/sdk Any internal REST API as MCP tools What We Ran Into in Production We have been running this bot on a busy engineering repository for several months. Before sharing the limitations we documented, it is worth saying that none of them were showstoppers — but they were real, and ignoring them would leave you unprepared. The biggest adjustment was latency. Complex queries that trigger 6 to 8 tool calls take 15 to 30 seconds. We handled this with the thinking-indicator pattern — post a placeholder message immediately, then update it when the answer is ready — which kept the experience feeling responsive even when the underlying calls were slow. Debugging took more work than expected. When a traditional API client gives a wrong answer, the fix is obvious. When an agentic loop gives a wrong answer, you need to know which tools Claude chose, what they returned, and how Claude reasoned over the results. We solved this by logging every tool call — name, input, result, timestamp — and shipping those logs to our observability platform. That log became our primary debugging tool for agent behavior. Prompt quality turned out to be load-bearing in a way we did not fully anticipate. Early versions of the bot would return raw asterisks in Slack, generate unusable walls of text for large result sets, and occasionally pick the wrong tool. Each of these was a prompt fix, not a code fix. Investing time in the system prompt before going live would have saved us several rounds of iteration. Cost is worth monitoring at scale. A query triggering 8 Bedrock calls costs meaningfully more than a single response. For an internal team tool used dozens of times a day, the cost is negligible. At higher volume, it warrants attention. The productivity gain from not maintaining API clients has outweighed all of these constraints at the scale we operate. The right framing is not "is MCP perfect?" but "is MCP better than the alternative?" For our team, the answer has consistently been yes. Conclusion The architecture across these two articles is intentionally small. A tool registry, a generic agent loop, and a system prompt that configures behavior — that is all of it. The 150 lines in the repository are a starting point your team can run today and grow as your toolchain does. Start with GitHub MCP. Get it answering questions in Slack. Test it with your team. Then look at what they ask most often and which data sources those questions touch. The next MCP server to register will be obvious. The code to add it is four lines. If you landed on Part 2 first and want to understand the architectural reasoning — why MCP is a fundamentally better pattern than the conventional REST API wrapper approach, and why it matters especially for SRE and platform teams — Part 1 covers all of that and is the recommended starting point. Part 1: Why MCP Changes Everything About AI Tool Integration. Full project code: https://github.com/sangharshcs/slack-github-mcp-bot.
Real-time AI inference has become a fundamental feature of modern applications and has been used to drive applications in conversational agents, recommendation engines, fraud detection, and computer vision pipelines. In contrast to batch workloads, real-time inference requires stable, low-latency, predictable scaling, and resource efficiency. With the increase in the size or the number of computations performed by models, it becomes more complicated to provide these experiences at a reliable level, particularly when considering the performance versus the cost of operation. Cloud Run Cloud Run offers a simple, scalable, and managed infrastructure that delivers real-time machine learning models in the Google Cloud platform with the help of GPU acceleration and Vertex AI. This architecture allows teams to deploy containerized inference services that automatically scale with traffic while using GPUs to execute high-throughput model inference. Instead of deploying fixed clusters or provisioning resources manually, organizations can adopt a serverless-first approach, which has the capacity to bring compute capacity in step with demand. With the combination of these services, engineering teams are able to construct inference pipelines, which appear like current microservice platforms. Traffic is directed via controlled points, models are executed on specialized hardware, and observability is built into the operating system. This model takes away a significant portion of the complexity found within the underlying infrastructure, enabling the developers to concentrate on application logic and still attain production-grade performance. Deploying Low-Latency Inference With Cloud Run and GPUs Cloud Run is a service that provides a serverless experience to deploy containerized workloads. It is easily applicable to real-time inference services. Cloud Run can be used to run models that consume a lot of compute, though, with automatic scaling and billed on a request basis, when combined with instances that have GPUs. This enables teams to run stateless services as models that spin up when incoming traffic is detected and scale down when idle, enhancing responsiveness and cost efficiency. Practically, the models are bundled into containers that provide endpoints of inference via thin APIs. Such services are able to preload models upon startup and maintain them in the memory of the GPUs so that they can be swiftly executed. Cloud Run also does traffic routing, instance management, and scaling, and does not require managing node pools or orchestration layers. For latency-sensitive applications, concurrency settings can be configured, and the minimum number of instances can be set to minimize cold-start effects and guarantee a predictable response time. This deployment pattern can serve a wide variety of workloads, from transformer-based language models to vision inference pipelines. Since Cloud Run is seamlessly connected to GCP networking and identity services, inference endpoints can be sheltered under an API gateway and authenticated with IAM-based access. This allows the deployment of production that satisfies enterprise security and still offers the agility of serverless infrastructure. Integrating Vertex AI for Model Management and Observability Whereas Cloud Run supports inference serving, Vertex AI offers a support MLOps environment that can be used to scale models. Vertex AI provides a centralized system of record for the teams by handling model artifacts, experiment tracking, and versioning. This isolation of concerns enables engineers to deploy models without considering the serving infrastructure while still being able to trace iterations. Interestingly, Vertex AI also allows tracing model performance and system behavior. Numerical indicators, e.g., latency, throughput, and error rates, can also be gathered alongside model-specific indicators, helping teams notice regressions or slowdowns over time. A good number of organizations send inference logs and prediction data to BigQuery to perform offline analyses on it to gain a better understanding of how it is used and the quality of responses it offers. This feedback loop helps with continuous improvement without interrupting live services. Vertex AI is often combined with CI/CD pipelines to automatically promote models across environments in production environments. The validation of the new versions can be done in staging and deployed to Cloud Run endpoints, which are stable with the capability to quickly iterate. This practice of operation can be compared to the current software delivery practices, where machine learning models are perceived as versioned parts of a broader application ecosystem. Scaling, Cost Optimization, and Production Readiness Inference in real time can be scaled by paying special attention to the cost and performance. GPUs provide high acceleration, but they have to be put to good use to warrant their cost. A request-driven scaling model for Cloud Run can scale resources in accordance with actual demand, and utilization during peak load can be enhanced with batching strategies and concurrency controls. The teams use these techniques in conjunction with caching and request deduplication to further optimize throughput. Security and good governance are also required in production readiness. Inference services are normally executed with dedicated service accounts with limited privileges, and sensitive information is isolated using encryption protocols and access controls. Privacy can be implemented by blocking inference traffic out of trusted environments by restricting connections between networks with firewall rules and network links. These controls assist companies in launching AI services that adhere to company policies and regulations. Finally, effective real-time inference systems are similar to well-developed cloud-native systems. They are visible, automated, and constantly honed. Opposite to the traditional approach to AI platform building, which combines Cloud Run to offer scalable serving, GPUs to realize performance, and Vertex AI to provide lifecycle management, organizations can create AI platforms that provide low-latency experiences and ensure operational discipline. The combined solution will enable teams to go beyond experimentation and deliver reliable AI functionality at enterprise scale.
This post walks through building and running a real-world agentic workflow with Agentican and Quarkus. Specifically, an agentic workflow to automate market research and information sharing: Identify the top vendors within a market category.Research the positioning and strengths of each vendor.Classify the findings as either standard or urgent.Draft a brief to share with others in the company. Prerequisites QuarkusJava 25Maven (or Gradle)LLM provider API key Step 1: Add the dependency Create a Quarkus app, and add the Agentican Quarkus runtime module: XML <dependency> <groupId>ai.agentican</groupId> <artifactId>agentican-quarkus-runtime</artifactId> <version>0.1.0-alpha.3</version> </dependency> Step 2: Define Agents, Skills, and the Workflow Create an `agentican-catalog.yaml` file on the classpath. This is where you describe: Who does the work (agents)What they need to do it (skills)How they will do it (workflows) YAML agents: - id: researcher name: researcher role: | Expert at finding accurate, sourced information about companies and markets. Quotes sources. Distinguishes opinion from fact. - id: writer name: writer role: | Synthesizes research into structured, concise briefs. Avoids hedging language. Cites concrete evidence. skills: - id: web-search name: web-search instructions: | When a question requires external information, call the search tool first. Quote sources in your answer. Update the `agentican-catalog.yaml` file to define the workflow. YAML workflows: - id: market-brief name: market-brief description: Research vendors in a market and produce a structured brief outputStep: deliver params: - name: topic description: Market to research required: true - name: vendor_count description: Number of vendors defaultValue: "5" steps: - name: identify agent: researcher skills: [web-search] instructions: | Identify the top {{param.vendor_count} vendors in {{param.topic}. Return a JSON array of vendor names — names only, no commentary. - name: deep-dive type: loop over: identify steps: - name: analyze agent: researcher skills: [web-search] instructions: | Deep-dive vendor {{item}: positioning, key strengths, recent news. Quote sources. - name: classify agent: writer instructions: | Read the per-vendor deep-dives below. If any vendor has launched a competitive feature in the last 30 days, return the single word 'urgent'. Otherwise return 'standard'. Deep-dives: {{step.deep-dive.output} dependencies: [deep-dive] - name: deliver type: branch from: classify default: standard branches: - name: urgent steps: - name: urgent-brief agent: writer instructions: | Synthesize a vendor brief flagged URGENT for executive review. Lead with the recent competitive moves. Topic: {{param.topic} Deep-dives: {{step.deep-dive.output} - name: standard steps: - name: standard-brief agent: writer instructions: | Synthesize a vendor brief. Topic: {{param.topic} Deep-dives: {{step.deep-dive.output} A few things worth flagging: agent: researcher references the agent for a step, skills referenced by name, too.outputStep designates the step whose output becomes the workflow's typed result.{{param.X} interpolates workflow inputs into step instructions.{{step.X.output} interpolates an upstream step's output.{{item} is the current value inside a loop iteration.type: loop steps take an over reference (a step that produced a list, or a list-typed param).type: loop steps run their nested steps once per item, in parallel, and on virtual threads.type: branch steps take a from reference (a step whose output is used to select a branch).branches: mutually exclusive steps (or sets of steps) with default for unrecognized values. The framework loads agentican-catalog.yaml from the classpath, or you can define where it's loaded from: Properties files agentican.catalog-config=/etc/agentican/agentican-catalog.yaml Note: Agents, skills, and workflows can be defined via a fluent builder API as well. Step 3: Configure the Models Agentican reads the engine configuration from `application.properties`. The minimum is one LLM: Properties files agentican.llm[0].api-key=${ANTHROPIC_API_KEY} The provider defaults to `anthropic`, and the model defaults to `claude-sonnet-4-5`. Want OpenAI instead? Properties files agentican.llm[0].provider=openai agentican.llm[0].api-key=${OPENAI_API_KEY} agentican.llm[0].model=gpt-4o-mini Want to mix and match? Configure `name`s and reference them per-agent in the YAML catalog: Properties files agentican.llm[0].name=default agentican.llm[0].api-key=${ANTHROPIC_API_KEY} agentican.llm[1].name=efficient agentican.llm[1].provider=openai agentican.llm[1].api-key=${OPENAI_API_KEY} agentican.llm[1].model=gpt-4o-mini Step 4: Create a Typed Workflow Instance Define the workflow input and output records: Java public record ResearchParams(String topic, int vendorCount) {} public record VendorBrief(String topic, List<Vendor> vendors) { public record Vendor(String name, String positioning, List<String> strengths) {} } Then inject the typed workflow, and call it from a REST endpoint: Java @Path("/market-brief") public class VendorBriefResource { @Inject @AgenticanWorkflow(name = "market-brief") Workflow<ResearchParams, VendorBrief> brief; @POST @Path("/{topic}") public VendorBrief generate(@PathParam("topic") String topic) { return brief.start(new ResearchParams(topic, 5)).await(); } } Now, test the endpoint: Shell curl -X POST http://localhost:8080/market-brief/data%20observability%20platforms A few things worth flagging — they're what set this apart from a generic "call an LLM" library: ResearchParams.vendorCount becomes the workflow parameter vendor_count via SNAKE_CASE mapping.start() returns a WorkflowRun<VendorBrief> and await() parses the output step's text into a VendorBrief.@AgenticanWorkflow(name = "vendor-brief") resolves the registered workflow at injection time. Note: WorkflowRun itself exposes future() for a CompletableFuture<R>, and there's a ReactiveWorkflow<P, R> Mutiny variant for Vert.x stacks. Step 5: Add Agent Tools Agentican ships two integrations out of the box: MCP (Model Context Protocol) There is one config block per server. Tools are auto-discovered: Properties files agentican.mcp[0].slug=github agentican.mcp[0].name=GitHub agentican.mcp[0].url=https://mcp.github.com/sse agentican.mcp[0].headers.Authorization=Bearer ${GITHUB_TOKEN} Composio 100+ SaaS toolkits — Slack, Notion, Linear, Salesforce, GitHub, Google Workspace: Properties files agentican.composio.api-key=${COMPOSIO_API_KEY} agentican.composio.user-id=user-123 Tools are referenced by name within agent steps: YAML steps: - name: research agent: researcher tools: [github_search_repositories] instructions: "Profile open-source vendors in {{param.topic}." Structured agentic workflows for the JVM. Where to Go Next Getting Started — install, configure, and run workflowsCore Concepts — architecture, terminology, and data flowWorkflows & Steps — CDI surface, beans, qualifiers, override patterns.Agents — defining agents, skills, and rolesGetting Started (Quarkus) — dependency setup, config, first taskCDI Integration — injection, qualifiers, lifecycle events, bean overridesREST API — endpoints, SSE streaming, WebSocket, error codesObservability — Micrometer metrics, OTel tracing, Prometheus queries
Abstract This is a continuation of the first article in this series, Building a Spring AI Assistant with MCP Servers: A Step-by-Step Tutorial, and describes how one may address a serious concern when thinking of going from prototype to production — security. The Problem The MCP specification recommends that MCP servers using HTTP as their transport layer be secured with OAuth 2.0 access tokens. In practice, plenty of teams don't have the surrounding infrastructure — an authorization server, token introspection, and operational maturity — ready when they start exposing internal tools to an AI assistant. But the traffic still needs to be authenticated. This article walks through a simpler scheme that fits that gap: per-server API keys carried in a custom HTTP header. The MCP server only authorizes requests that present a valid key; the MCP client analyzes each outbound request at runtime and attaches the right header for the right destination. We'll use Spring AI 1.1.4, MCP Spring Security 0.1.5, and Spring Security on Java 25. The setup involves three applications: telecom-assistant – the AI host and MCP client (port 8080)invoice-mcp-server – exposes invoice tools, keeps API keys in PostgreSQL (port 8081)vendor-mcp-server – exposes vendor tools, keeps a single API key in memory (port 8082) Two servers, two different storage strategies, on purpose - to show both ends of the spectrum. Every MCP server has its own API key id and secret. The picture below sketches the flow and the requirements to accomplish that. To be able to follow along, switch to the 2-main branch of the designated GitHub repository. Upon resolving the TODOs in there, this goal will have been fulfilled. Securing the Vendor Service (In-Memory Keys) TODO 1. This is the simpler case. Start by adding the security dependencies in pom.xml: XML <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency> <dependency> <groupId>org.springaicommunity</groupId> <artifactId>mcp-server-security</artifactId> <version>0.1.5</version> </dependency> TODO 2. Since the API keys are stored in memory in this case, they are declared in the application.properties file, still as environmental variables. Properties files api.key.id = ${API_KEY_ID} api.key.secret = ${API_KEY_SECRET} TODO 3. The main aspect regarding this enhancement is the security configuration. In this regard, the below @Configuration class is added. Java @EnableWebSecurity @Configuration public class SecurityConfig { @Value("${api.key.id}") private String apiKeyId; @Value("${api.key.secret}") private String apiKeySecret; @Bean ApiKeyEntityRepository<ApiKeyEntity> apiKeyRepository() { return new InMemoryApiKeyEntityRepository<>( List.of(ApiKeyEntityImpl.builder() .name("API key") .id(apiKeyId) .secret(apiKeySecret) .build())); } @Bean SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception { return http.authorizeHttpRequests(auth -> auth.anyRequest().authenticated()) .with(McpApiKeyConfigurer.mcpServerApiKey(), apiKeyConfig -> apiKeyConfig.apiKeyRepository(apiKeyRepository()) .headerName("vendor-x-api-key")) .build(); } } A single ApiKeyEntity instance is constructed and stored as part of an InMemoryApiKeyEntityRepository. Then, when the SecurityFilterChain is built, a SecurityConfigurerAdapter is applied and an McpApiKeyConfigurer is used via which two concerns are addressed. On one hand, the expected security header name is set — vendor-x-api-key, while on the other, the repository that stores the server API key. At this point, the MCP server is secured. In order to be able to successfully communicate, an MCP client shall send HTTP requests that contain the required header that has the following form: Plain Text "vendor-x-api-key": [api-key-id].[api-key-secret] where api-key-id and api-key-secret are replaced with the values configured above. To test this functionality, the MCP Inspector [Resource 3] is used again, and additionally, before connecting to the running server, the authentication data is configured — vendor-x-api-key header is set to the known id.secret value. Securing the Invoice Service (Keys in PostgreSQL) Switching to the invoice-mcp-server, the enhancements here are a bit more complex as the API keys are stored in an external repository. TODO 4. Again the security dependencies are added in the pom.xml file, as before. TODO 5. As API keys are stored in the database, more exactly in the ServerApiKeys table, an mapping entity is created. Java @Table("ServerApiKeys") public class ServerApiKey { public static final String COL_SERVER = "Server"; public static final String COL_KEY_ID = "KeyId"; public static final String ON_CONFLICT_CLAUSE = String.format("(%s,%s)", COL_SERVER, COL_KEY_ID); @PkColumn("Id") private int id; @Column(COL_SERVER) private String server; @Column(COL_KEY_ID) private String keyId; @Column("KeySecret") private String keySecret; ... } As the Asentinel ORM library is already present in this module’s class-path, it is used to manage these entities; thus, the class is decorated with specific annotations. TODO 6. Just as previously done for the vendor server, the security configuration needs an ApiKeyEntityRepository. The approach here is more general, the interface is implemented, and the specific manner is suited. Java public class DbApiKeyEntityRepository implements ApiKeyEntityRepository<DbApiKeyEntityRepository.InvoiceApiKeyEntity> { private final OrmOperations orm; public DbApiKeyEntityRepository(OrmOperations orm) { this.orm = orm; } @Override public InvoiceApiKeyEntity findByKeyId(@NonNull String keyId) { return orm.newSqlBuilder(ServerApiKey.class) .select() .where() .column(ServerApiKey.COL_SERVER).eq("invoice-mcp").and() .column(ServerApiKey.COL_KEY_ID).eq(keyId) .execForOptional() .map(serverApiKey -> new InvoiceApiKeyEntity(keyId, serverApiKey.getKeySecret())) .orElse(null); } } As every record (API key) in the table is uniquely identified by Server and KeyId, whenever a request is received, the repository checks it and returns an implementation of the ApiKeyEntity interface, in our case Java public static final class InvoiceApiKeyEntity implements ApiKeyEntity { private final String id; @Nullable private String secret; private InvoiceApiKeyEntity(String id, @Nullable String secret) { this.id = id; this.secret = secret; } @Override public String getId() { return id; } @Override public @Nullable String getSecret() { return secret; } @Override public void eraseCredentials() { this.secret = null; } @Override public InvoiceApiKeyEntity copy() { return new InvoiceApiKeyEntity(id, secret); } } built from the database entity upon retrieval. It is a good practice to keep the secret of a ServerApiKey entity encoded in the database. In this tutorial, the default one — bcrypt — is used. To check the repository, the following simple integration test is used. Java @SpringBootTest @Transactional class DbApiKeyEntityRepositoryTest { private DbApiKeyEntityRepository apiKeyRepository; @Autowired private OrmOperations orm; private final PasswordEncoder passwordEncoder = PasswordEncoderFactories.createDelegatingPasswordEncoder(); @BeforeEach public void setUp() { apiKeyRepository = new DbApiKeyEntityRepository(orm); } @Test void provisionServerApiKey() { ServerApiKey serverApiKey = new ServerApiKey(); serverApiKey.setServer("invoice-mcp"); serverApiKey.setKeyId("api-key-id"); serverApiKey.setKeySecret(passwordEncoder.encode("api-key-secret")); orm.upsert(serverApiKey, PostgresJdbcFlavor.UPSERT_CONFLICT_PLACEHOLDER, ServerApiKey.ON_CONFLICT_CLAUSE); DbApiKeyEntityRepository.InvoiceApiKeyEntity apiKey = apiKeyRepository.findByKeyId(serverApiKey.getKeyId()); Assertions.assertNotNull(apiKey); Assertions.assertEquals(serverApiKey.getKeyId(), apiKey.getId()); Assertions.assertEquals(serverApiKey.getKeySecret(), apiKey.getSecret()); } } TODO 7. Once this is completed, the security configuration is set up, just as before. The only difference is that the previously created ApiKeyEntityRepository repository implementation is used and not an in-memory one this time. Java @Configuration @EnableWebSecurity public class SecurityConfig { private OrmOperations orm; @Autowired public void setOrm(OrmOperations orm) { this.orm = orm; } @Bean ApiKeyEntityRepository<DbApiKeyEntityRepository.InvoiceApiKeyEntity> apiKeyRepository() { return new DbApiKeyEntityRepository(orm); } @Bean SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception { return http.authorizeHttpRequests(auth -> auth.anyRequest().authenticated()) .with(McpApiKeyConfigurer.mcpServerApiKey(), apiKeyConfig -> apiKeyConfig.apiKeyRepository(apiKeyRepository()) .headerName("invoice-x-api-key")) .build(); } } At this point, the invoice-mcp server is secured as well, it can be checked with the MCP Inspector. Making the Client Send the Right Header to the Right Server Both servers are locked down. Now the client needs to know that requests to http://localhost:8081/mcp-invoice should carry the invoice-x-api-key header and requests to http://localhost:8082/mcp-vendor should carry vendor-x-api-key. A clean way to encode this is a chain of responsibility of resolvers. TODO 8. The expected API keys’ ids and secrets for the two servers are configured in the application.properties and for convenience, read from environment values. For simplicity, here both read from the same, although in a real implementation would not. Properties files mcp.server.api-key.parameters.invoice.id = ${API_KEY_ID} mcp.server.api-key.parameters.invoice.secret = ${API_KEY_SECRET} mcp.server.api-key.parameters.vendor.id = ${API_KEY_ID} mcp.server.api-key.parameters.vendor.secret = ${API_KEY_SECRET} Then, mapped into a @ConfigurationProperties annotated class. Java @ConfigurationProperties(McpServerApiKeyProperties.CONFIG_PREFIX) public class McpServerApiKeyProperties { public static final String CONFIG_PREFIX = "mcp.server.api-key"; private final Map<String, ApiKeyParams> parameters = new HashMap<>(); public Map<String, ApiKeyParams> getParameters() { return parameters; } public record ApiKeyParams(String id, String secret) {} } TODO 9. As there are two MCP servers involved, whenever the LLM instructs the AI host that it needs to query one of them, a destination-resolving strategy applied at runtime is introduced. It’s implemented as a chain of MCP server resolvers. The common interface of this chain of responsibility is McpServerResolver. It’s generic and declares two methods as part of the contract it proposes. Java public interface McpServerResolver<T> { Optional<T> resolve(URI uri); default String id() { return getClass().getSimpleName(); } } The central method that each implementer shall define receives the destination uri and attempts to resolve one of the available servers. If successful, the result is further used. The second optional method has the particular scope of identifying the current resolver; it has a default implementation and might help down the line during the resolving process (logging, etc.). As the items in the chain here have a similar approach, the next part is an abstract common implementation of the above interface. Java abstract class AbstractMcpServerResolver<T> implements McpServerResolver<T> { private static final Logger log = LoggerFactory.getLogger(AbstractMcpServerResolver.class); private final McpServerResolver<T> next; protected AbstractMcpServerResolver(McpServerResolver<T> next) { this.next = next; } @Override public Optional<T> resolve(URI uri) { if (uri == null) { return Optional.empty(); } log.debug("[{}]: Checking request towards {}.", id(), uri); Optional<T> result = resolveSpecific(uri); if (result.isPresent()) { log.debug("[{}]: Resolved target endpoint {}.", id(), uri); return result; } if (next == null) { log.debug("[{}]: No next resolver configured.", id()); return Optional.empty(); } log.debug("[{}]: Target endpoint {} not resolved. Delegating to [{}].", id(), uri, next.id()); return next.resolve(uri); } protected abstract Optional<T> resolveSpecific(URI endpoint); } The particular action is to be defined by each link in the chain as part of the Optional resolveSpecific(URI endpoint) method. Here, the functionality is similar; thus, the next common implementation is enough. Java public class UrlMcpServerResolver extends AbstractMcpServerResolver<ApiKeyHeader> { private static final Logger log = LoggerFactory.getLogger(UrlMcpServerResolver.class); private final URI serverUri; private final ApiKeyHeader header; public UrlMcpServerResolver(McpServerResolver<ApiKeyHeader> nextResolver, String serverUrl, ApiKeyHeader header) { super(nextResolver); this.serverUri = URI.create(serverUrl); this.header = header; } @Override protected Optional<ApiKeyHeader> resolveSpecific(URI endpoint) { if (serverUri.equals(endpoint)) { log.debug("[{}]: Target endpoint {} and config URL {} match.", id(), endpoint, serverUri); return Optional.of(header); } log.debug("[{}]: Target endpoint {} and config URL {} don't match.", id(), endpoint, serverUri); return Optional.empty(); } } TODO 10. The above resolveSpecific() method decides whether the current request is towards a particular server. If successful, an ApiKeyHeader object is returned so that it can be further used. Java public record ApiKeyHeader(String name, String value) {} TODO 11. The last step is the security @Configuration class that glues together the above-created pieces. Java @Configuration @EnableConfigurationProperties({McpServerApiKeyProperties.class}) public class SecurityConfig { private static final Logger log = LoggerFactory.getLogger(SecurityConfig.class); public McpStreamableHttpClientProperties mcpClientProps; public McpServerApiKeyProperties mcpServerApiKeys; @Autowired public void setMcpClientProps(McpStreamableHttpClientProperties mcpClientProps) { this.mcpClientProps = mcpClientProps; } @Autowired public void setMcpServerApiKeys(McpServerApiKeyProperties mcpServerApiKeys) { this.mcpServerApiKeys = mcpServerApiKeys; } @Bean ApiKeyHeader invoiceApiKeyHeader() { var apiKey = mcpServerApiKeys.getParameters().get("invoice"); return new ApiKeyHeader("invoice-x-api-key", String.format("%s.%s", apiKey.id(), apiKey.secret())); } @Bean ApiKeyHeader vendorApiKeyHeader() { var apiKey = mcpServerApiKeys.getParameters().get("vendor"); return new ApiKeyHeader("vendor-x-api-key", String.format("%s.%s", apiKey.id(), apiKey.secret())); } @Bean McpServerResolver<ApiKeyHeader> serverResolver() { var mcpProps = mcpClientProps.getConnections(); var mcpInvoice = mcpProps.get("invoice"); var mcpVendor = mcpProps.get("vendor"); return new VendorMcpServerResolver(new InvoiceMcpServerResolver(null, String.format("%s%s", mcpInvoice.url(), mcpInvoice.endpoint()), invoiceApiKeyHeader()), String.format("%s%s", mcpVendor.url(), mcpVendor.endpoint()), vendorApiKeyHeader()); } @Bean McpSyncHttpClientRequestCustomizer requestCustomizer() { return (builder, method, endpoint, body, context) -> { log.info("MCP Client request: method={}, endpoint={}, body={}", method, endpoint, body); serverResolver() .resolve(endpoint) .ifPresent(apiKeyHeader -> builder.header(apiKeyHeader.name(), apiKeyHeader.value())); }; } } The McpServerResolver returned by serverResolver() is used by the McpSyncHttpClientRequestCustomizer to analyze the request and add the necessary security header. Watching it Work Upon reaching this point, the MCP servers are restarted, together with the telecom-assistant. If the prompt in the screenshot below is issued, obviously, the invoice server should be queried, and the response should be received accordingly. In the AI host logs, the server resolving process can be depicted. Plain Text DEBUG i.m.c.t.HttpClientStreamableHttpTransport - Sending message JSONRPCRequest[jsonrpc=2.0, method=tools/call, id=2902fe67-2, params=CallToolRequest[name=get-invoices-by-pattern-on-number, arguments={pattern=vdf}, meta={}]] INFO c.h.t.config.SecurityConfig - MCP Client request: method=POST, endpoint=http://localhost:8081/mcp-invoice, body={"jsonrpc":"2.0","method":"tools/call","id":"2902fe67-2","params":{"name":"get-invoices-by-pattern-on-number","arguments":{"pattern":"vdf"},"_meta":{}} DEBUG c.h.t.c.r.AbstractMcpServerResolver - [VendorMcpServerResolver]: Checking request towards http://localhost:8081/mcp-invoice. DEBUG c.h.t.c.r.UrlMcpServerResolver - [VendorMcpServerResolver]: Target endpoint http://localhost:8081/mcp-invoice and config URL http://localhost:8082/mcp-vendor don't match. DEBUG c.h.t.c.r.AbstractMcpServerResolver - [VendorMcpServerResolver]: Target endpoint http://localhost:8081/mcp-invoice not resolved. Delegating to [InvoiceMcpServerResolver]. DEBUG c.h.t.c.r.AbstractMcpServerResolver - [InvoiceMcpServerResolver]: Checking request towards http://localhost:8081/mcp-invoice. DEBUG c.h.t.c.r.UrlMcpServerResolver - [InvoiceMcpServerResolver]: Target endpoint http://localhost:8081/mcp-invoice and config URL http://localhost:8081/mcp-invoice match. DEBUG c.h.t.c.r.AbstractMcpServerResolver - [InvoiceMcpServerResolver]: Resolved target endpoint http://localhost:8081/mcp-invoice. DEBUG i.m.c.t.HttpClientStreamableHttpTransport - Received SSE stream response, using line subscriber DEBUG i.m.spec.McpSchema - Received JSON message: {"jsonrpc":"2.0","id":"2902fe67-2","result":{"content":[{"type":"text","text":"[{\"id\":8,\"number\":\"vdf-tf-rev-1\",\"date\":\"2025-05-20\",\"vendor\":{\"id\":2,\"name\":\"Vodafone\"},\"serviceType\":{\"id\":3,\"name\":\"TollFree\"},\"status\":\"UNDER_REVIEW\",\"total\":10.44},{\"id\":7,\"number\":\"vdf-mpls-app-1\",\"date\":\"2025-05-10\",\"vendor\":{\"id\":2,\"name\":\"Vodafone\"},\"serviceType\":{\"id\":4,\"name\":\"MPLS\"},\"status\":\"APPROVED\",\"total\":80.44},{\"id\":6,\"number\":\"vdf-lo-paid-1\",\"date\":\"2025-06-10\",\"vendor\":{\"id\":2,\"name\":\"Vodafone\"},\"serviceType\":{\"id\":5,\"name\":\"Local\"},\"status\":\"PAID\",\"total\":85.44}]"}],"isError":false} The invoice server validates the invoice-x-api-key header against its database-backed repository, and the tool call proceeds. Final Notes API key authentication is a pragmatic stepping stone, not an end state. In a production environment, OAuth 2.0 remains the recommended approach, and Spring Security supports both. Even with API keys in play, take the basics seriously: store secrets encoded (bcrypt or stronger), rotate them, use a distinct key per server, and combine with TLS so the headers aren't visible in transit. The resolver-chain pattern on the client side gives you a natural place to add more rules later — token-fetch logic for OAuth, region-based routing, anything URI-shaped — without touching the rest of the AI host. The next (also the last) article in this series concludes the tutorial by instrumenting the chat client with advisors for memory, token tracking, and logging, and ultimately formulating several takeaways. Resources [1] – The source code for the Spring AI Telecom Assistant [2] – asentinel-orm project [3] – MCP Inspector
A Small Program and a Dot Matrix Printer In the early 1990s, one of the applications I worked on ran on a single PC in a small office. The program generated invoices and printed them on a dot matrix printer. The interface was text-based, the hardware was limited, and the system served only a handful of users. The application was built using Clipper and early PC-based database tools. It solved a very specific problem — automating billing and record keeping for a local business that previously relied on manual ledgers. By today's standards, the system would appear extremely simple. Yet for that organization, it represented a meaningful step toward digital operations. Three decades later, software systems now operate at a global scale. Applications run across distributed cloud infrastructure, serve millions of users, and increasingly incorporate artificial intelligence. During these thirty years, I have had the opportunity to work through several major transitions—from standalone PC applications to enterprise Java platforms, service-oriented architectures, cloud platforms, and now AI-driven systems. Looking back across these transitions, one lesson becomes clear: Technology waves succeed not because they introduce new tools, but because they unlock new categories of business value 7 Technology Waves I Have Seen in 30 Years of Software Over three decades in software engineering, I have observed several major technology waves. Each wave introduced new architectural patterns and development tools, but more importantly, each expanded the scope of problems that software could address. These waves can be roughly summarized as: Standalone PC applicationsClient–server systemsEnterprise Middleware and Java platformsService-oriented architectureCloud and SaaS platformsMicroservices and cloud-native systemsAI-driven systems Each stage brought new possibilities for businesses and changed how software systems were designed. Wave 1: Standalone PC Applications In the early era of personal computing, software was primarily designed to automate specific tasks within a single organization. Typical applications included are: Accounting systemsBilling and invoice generationInventory trackingPayroll management These systems often ran on individual computers or small LAN networks. User interfaces were basic, and printed reports were a central part of the workflow. Despite their simplicity, these applications delivered an important transformation: They helped organizations move from manual record-keeping to digital data management. However, the systems were typically isolated and lacked integration across departments Wave 2: Client–Server Systems As networking technologies improved, software systems evolved from standalone applications to client–server architectures. Multiple users could now interact with centralized databases through networked applications. This enabled the emergence of ERP systems, where multiple business functions were integrated into a single system. For the first time, organizations could connect workflows across departments, such as: FinanceInventoryProcurementOperationsHuman resources The value created during this phase was significant. Software moved from automating individual tasks to connecting entire organizations. This allowed business leaders to gain greater operational visibility and make decisions based on integrated data. Wave 3: Enterprise Middleware and Java Platforms As businesses began building larger and more complex systems, enterprise middleware platforms emerged to support scalable application architectures. Enterprise Java platforms and application servers such as Oracle WebLogic Server and IBM WebSphere became central components of enterprise IT systems. These platforms enabled the development of mission-critical applications in industries such as: Financial servicesBankingPayment systemsLarge enterprise platforms During this phase, software architecture began to emphasize: Transaction managementScalabilityDistributed computingEnterprise integration For many organizations, these platforms formed the backbone of their digital infrastructure. Wave 4: Service-Oriented Architecture As enterprises deployed more systems, integration became a major challenge. Service-oriented architecture (SOA) introduced a model in which business capabilities could be exposed as reusable services that interacted across applications. This allowed organizations to integrate: Internal enterprise systemsPartner platformsPayment processing systemsEnterprise workflows Although many SOA implementations became complex, the concept of service-based architecture influenced later models such as microservices. Wave 5: Cloud Computing and SaaS Cloud computing addressed one of the biggest historical limitations in enterprise systems: infrastructure rigidity. Traditionally, organizations had to purchase hardware upfront and estimate future capacity requirements. Cloud computing introduced elastic infrastructure, allowing systems to scale dynamically. This shift enabled the growth of: SaaS platformsdigital startupsglobal service ecosystems Cloud computing significantly accelerated innovation by lowering the barrier to launching new digital services. Wave 6: Microservices and Cloud-Native Systems As digital platforms expanded to a global scale, monolithic architectures became difficult to manage. Microservices architectures introduced systems composed of smaller, independently deployable services. This model enabled organizations to: Scale systems more efficientlyDeploy updates more frequentlyOrganize development teams around independent services Microservices became a foundation for many modern digital platforms. Wave 7: AI-Driven Systems Today, the software industry is entering the early stages of the next potential transformation: AI-driven systems. AI is already being used in areas such as: AI-assisted codingAutomated customer supportData analysis and insightsWorkflow automation However, most current applications focus primarily on productivity improvements. While valuable, these improvements represent incremental changes rather than a fundamental transformation. The true impact of AI will emerge when it begins enabling new types of business capabilities. Examples could include systems that: analyze operational data continuouslydetect emerging risks or opportunitiesadapt workflows dynamicallysupport complex decision making in real time In these scenarios, AI becomes an active participant in business operations. The Pattern Behind Every Software Revolution Looking across these seven waves reveals a consistent pattern. Technology innovations often begin as tools used by developers. Over time, organizations discover how to use these tools to create new forms of business value. Once these new capabilities become clear, entire industries reorganize around them. This pattern has repeated multiple times across the history of software. The internet enabled global digital businesses. Cloud computing enabled service-based software delivery. Microservices enabled hyperscale platforms. The question now is whether AI will unlock the next generation of business capabilities. The Real Opportunity for AI At present, many organizations are experimenting with AI primarily as a productivity tool. But the real opportunity lies in something much larger. AI has the potential to enable systems that are: AdaptiveIntelligentCapable of assisting decision makingAble to respond dynamically to changing conditions When AI becomes embedded directly into operational systems, it may transform how businesses function. The organizations that learn to harness these capabilities first may define the next era of the software industry. Final Thoughts Every technology wave in software history has expanded the scale of what software can achieve. From automating individual tasks to connecting entire organizations, from enabling global digital businesses to supporting massive cloud platforms, each wave has built upon the previous one. AI may represent the next step in this evolution. But, like every transformation before it, its success will depend not on the technology itself, but on the new business value it ultimately enables. The next generation of industry leaders will likely be those who discover how to use AI not simply to build software faster, but to reimagine what software systems can do for business.
A new paradigm, not a replacement of data engineering, but a fundamental shift in where engineering effort concentrates. If you were to ask a data engineer about their week, I am sure they would not speak about anything exciting. Most of their time is spent on data wrangling, messy upstream data, inconsistent date formats, null values that are not really null, and vendor exports that rename columns without noting those changes in the documentation. These are the kinds of data engineering tasks that nobody actually wants to do, yet they end up occupying the majority of their week. The classical approaches to data engineering, like schema validation, manually coded transforms, and algorithmic outlier detection, are good but reactive to change. As a consequence, pipeline crisis management is an endless priority for senior engineering roles, and valuable analytical work is pushed down the queue. The new paradigm of large language models is not a data processing engine. Instead of Spark and dbt, we still perform that. It acts as a kind of intelligent coordination layer that knows data in terms of meaning, not merely structure. In this article, we explain what that looks like, the relevant scope, and how to implement it in runnable form. An LLM would analyze a column labeled "rev_usd" containing entries like 1200.5, -1, and “n/a” and assume that this column represents revenue in US currency, that -1 is perhaps a sentinel value, and that “n/a” is a representation of missing data. It would do this without any prior rules being written. LLMs also have the unique ability to generate the appropriate cleaning code. LLMs can also generate code-cleaning data and respond with a justification in plain English. This is something that cannot be done with a regex that identifies patterns and captures generic null representations. An LLM would analyze a column labeled "rev_usd" containing entries like 1200.5, -1, and “n/a” and assume that this column represents revenue in US currency, that -1 is perhaps a sentinel value, and that “n/a” is a representation of missing data. It would do this without any prior rules being written. LLMs also have the unique ability to generate the appropriate cleaning code. LLMs can also generate code-cleaning data and respond with a justification in plain English. This is something that can’t be done by a regex that is able to identify patterns and capture generic null representations. The 4 Integration Points That Actually Matter LLMs can't be used generically or to replace ETL. They have high latency and high costs, and the results are not deterministic. The best approach is a surgical one: use them in the four areas that clearly show semantic comprehension that is superior to anything that could be described using a rules-based approach. 1. In the area of schema inference and mapping, LLMs can be used to provide a raw CSV or JSON document and can, without any field documentation, provide column semantics, suggest appropriate data types, and provide a mapping to the canonical target schema. This is the arena in which time savings can be best realized. A new vendor integration that previously required a data engineer to spend half a day can now be completed in seconds. 2. Generation of transformation code. Engineers write a cleaning rule for the LLM, which generates code for Pandas or SQL, including unit tests and comments. This output is a code artifact subject to version control, code review, and execution on your own infrastructure, plus the associated costs of a hot-path black-box API call to your service. 3. Anomaly classification with contextual awareness. The rule engines fail to classify the values of “-1,” "unknown", or “0” — these can be missing data, sentinel values, or legitimate records. LLMs consider the context and reasoning, which helps eliminate quality alert false positives. 4. Automated documentation and flow narration. With LLMs, documenting the transformations and the reasons for the changes becomes trivial, greatly improving the maintenance of the pipelines. For organizations with high engineering turnovers or complicated compliance requirements, this alone justifies the integration cost. Reference Architecture Permitting any leeway on operational constraints is impossible: LLMs will never interact with complete production datasets. These models will only ever use metadata or small samples, which are statistically representative of the larger set. The data transformation is done mostly by deterministic models, which are fast, auditable, and inexpensive. Given this, LLM calls will be made only during the initial phases of the data processing pipeline or upon triggering of the schema event changes. The steady-state operational costs are zero since you only incur costs due to LLM model calls if there is truly something 'new' that requires processing. Implementation: Schema Mapping Among all the components, let's first develop the schema mapper, which takes a sample DataFrame and a target schema and produces column mappings in structured JSON format. The objective is to get the model to reason about column semantics rather than rely solely on the nomenclature. Cust_id and customer_identifier should link to customer_id, and a column that is unlabelled should also link if the values in the column behave identically to customer IDs. Python import pandas as pd import json def llm_schema_mapper( source_df: pd.DataFrame, target_schema: dict, sample_size: int = 100 ) -> dict: """ Infers column mappings from source to target schema using an LLM. Returns: { source_col: target_col_or_None } The LLM reasons about column semantics — not just name similarity. 'cust_id', 'CustomerID', and 'id_customer' all map to 'customer_id'. """ client = anthropic.Anthropic() sample = source_df.head(sample_size).to_json( orient="records", indent=2 ) dtypes = source_df.dtypes.to_dict() null_rates = ( source_df.isnull().mean().round(3).to_dict() ) prompt = f"""You are a senior data engineer performing schema mapping. SOURCE COLUMN METADATA: {json.dumps({ "columns": list(source_df.columns), "dtypes": {k: str(v) for k, v in dtypes.items()}, "null_rates": null_rates, }, indent=2)} SAMPLE ROWS ({sample_size} rows): {sample} TARGET SCHEMA: {json.dumps(target_schema, indent=2)} Map each source column to the most semantically appropriate target field. Consider: column name, data type, value patterns, and null rates. If no target mapping is appropriate, use null. Respond ONLY with a valid JSON object in this exact shape: {{ "column_mapping": {{"source_col": "target_col_or_null"}, "confidence": {{"source_col": 0.0_to_1.0}, "notes": {{"source_col": "brief_reasoning_if_ambiguous"} }""" response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, temperature=0, # Deterministic for schema work messages=[{"role": "user", "content": prompt}] ) result = json.loads(response.content[0].text) # Surface low-confidence mappings for human review low_confidence = { col: result["notes"].get(col, "—") for col, conf in result["confidence"].items() if conf < 0.7 } if low_confidence: print(f"⚠ Low-confidence mappings flagged for review: {low_confidence}") return result["column_mapping"] # ─── Usage ─────────────────────────────────────────────────────── source = pd.DataFrame({ "cust_id": ["C001", "C002", "C003"], "full_name": ["Alice Johnson", "Bob Smith", "Carol Wu"], "dob": ["1990-04-12", "1985-11-03", "1992-07-22"], "rev_usd": [1200.5, -1, 980.0], # -1 is a sentinel value "src_sys": ["CRM", "CRM", "ERP"], # source system tag — no mapping }) target = { "customer_id": "string — unique customer identifier", "name": "string — full name", "date_of_birth": "date — ISO 8601 (YYYY-MM-DD)", "revenue": "float — USD, null if unknown", } mapping = llm_schema_mapper(source, target) print(json.dumps(mapping, indent=2)) # → { # "cust_id": "customer_id", # "full_name": "name", # "dob": "date_of_birth", # "rev_usd": "revenue", # "src_sys": null ← correctly excluded # } Note temperature=0 — for schema mapping, you want the same answer every time. Also note the confidence scoring: mappings below 0.7 are surfaced for human review before the pipeline runs. Implementation: Generating Transformation Code Code generation is the second and higher-value primitive. The engineer explains the intended cleaning in simple terms. The LLM generates an executable Python function, complete with comments. Crucially, the function is checked into version control and tested before it ever sees production data; this is not an active LLM call into your data pipeline. Python import pandas as pd import textwrap, ast def generate_transform( description: str, sample_df: pd.DataFrame, validate: bool = True ) -> str: """ Generates a validated `transform(df) -> df` function from a natural-language description of the cleaning rules required. Args: description: Plain-English cleaning instructions sample_df: Representative sample for type/value context validate: Parse-check the generated code before returning Returns: A string containing a complete Python function definition, ready to be written to disk and reviewed before execution. """ client = anthropic.Anthropic() prompt = f"""You are a senior data engineer. Write production-ready Python code to clean a pandas DataFrame. DATAFRAME SCHEMA: {sample_df.dtypes.to_string()} SAMPLE (5 rows): {sample_df.head(5).to_string()} CLEANING REQUIREMENTS: {textwrap.dedent(description)} Write a function with this signature: def transform(df: pd.DataFrame) -> pd.DataFrame: Rules: - Use only pandas (stdlib ok, no third-party imports) - Handle NaN and edge cases explicitly — never assume clean input - Add a one-line comment on each non-trivial step - Return a copy, never mutate the input DataFrame - Raise ValueError with a descriptive message for unrecoverable data issues Respond with ONLY the function definition. No markdown, no imports, no test code. The function must be importable as written.""" response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2000, temperature=0, messages=[{"role": "user", "content": prompt}] ) code = response.content[0].text.strip() if validate: try: ast.parse(code) # syntax check except SyntaxError as e: raise ValueError(f"LLM produced invalid Python: {e}") from e return code # ─── Usage ─────────────────────────────────────────────────────── sample = pd.DataFrame({ "cust_id": ["C001", "C002"], "full_name": [" Alice Johnson ", "bob smith"], "dob": ["4/12/1990", "1985-11-03"], "rev_usd": [1200.5, -1], }) fn_code = generate_transform(""" 1. Rename: cust_id → customer_id, full_name → name, dob → date_of_birth, rev_usd → revenue 2. Strip leading/trailing whitespace from all string columns 3. Title-case the 'name' column 4. Parse 'date_of_birth' to datetime, handling both M/D/YYYY and YYYY-MM-DD input formats 5. Replace revenue values of -1 or below 0 with NaN (these are sentinel 'unknown' values, not real negatives) """, sample) print(fn_code) # → def transform(df: pd.DataFrame) -> pd.DataFrame: # df = df.copy() # # Rename to canonical target schema # df = df.rename(columns={...}) # ... # Save for review, then execute: with open("transforms/vendor_a_clean.py", "w") as f: f.write(fn_code) # → PR review → merge → pipeline picks it up on next run The generated function is written to disk, reviewed in a pull request, and merged before it touches any production data. The LLM is a code-generation assistant, not a runtime dependency. Traditional vs. LLM-Augmented Pipelines The operational difference is most apparent when a new data source is introduced or an existing one is updated. For a traditional pipeline, this would involve a support ticket, a sprint story, and several hours of engineering work. For an LLM-augmented pipeline, the process is simply re-running the schema mapper and reviewing a pull request for the generated transformation function. Real-World Use Cases Over the various industries where the pattern has begun to emerge, there is a common theme of having a high variety of data sources, rapidly changing systems, and a requirement for clear, human-readable justification for automated decision-making. Multi-vendor data integration: A growing number of vendors are integrated into data ingestion systems in the finance and health care industries. Schema mapping that previously demanded a week of integration effort is now completed in less than a day.Self-healing pipelines: An LLM is able to analyze the stack trace and upstream schema to determine the cause of the failure, suggest a patch, and route it for human approval before the pipeline is re-executed.Data quality narration: Simplifying and summarising the output of Great Expectations to convert technical assertion failures into a call to action for stakeholders without a technical background.Modernizing legacy ETL: LLMs are capable of reading and converting legacy code into both dbt SQL and the associated documentation, converting and translating decades-old COBOL or PL/SQL transformation logic into documented, testable dbt models.The extraction of unstructured data: Data captured in records that are structured by parsing free-text fields, emails, PDFs, etc. Includes transforming medical notes to FHIR and contracts to structured obligation tables.Dynamic feature engineering: The time between having raw data and deriving it into a set of features that are ready for modeling is greatly reduced by the automatic proposing and generating of features that transform the data for ML pipelines. Integrating With Your Existing Stack You won't need to reconstruct your data platform to implement this approach, as LLM augmentation integrates with data tools that your teams already use. DBT: LLMs can be used to create SQL for dbt models from free text descriptions of the business transformations or to populate description fields in your schema YAML files. The LLM will read your source models and other relevant documentation to generate documentation that is consistent and accurate. AIRFLOW: LLM schema inference can be encapsulated as a separate task in the upstream node of your Directed Acyclic Graph (DAG). Downstream tasks can access the structured JSON produced by this task as XCom (cross-communication) values. If changes to the schema are detected, a human-approval gate should be triggered prior to allowing the rest of the DAG to execute. GX: Once LLM schema inference has been completed, automatically create expectation suites for Great Expectations that align with the inferred types, value ranges, and nullability expectations. This will result in your data contracts being in sync with your expectations without additional effort. SPARK: LLMs can also be used to create transformation functions in PySpark for new source schemas, which can be executed within your existing Spark jobs. This means that only the transformation functions will be new, while the rest of the orchestration layer will remain the same. Challenges You Need to Solve Before Shipping All of this costs money. Five forms of failure will come back to haunt you if you apply LLM-generated code to sensitive data without addressing these issues. Risk of hallucination: LLMs can generate erroneous column mappings or generate transformations that are syntactically valid but semantically incorrect. Mandatory transform code validation, unit tests, sample data tests, output schema tests, and high-stakes mappings require human review. The confidence score in the schema mapper exists to prompt human review.Non-determinism: The same prompt may produce different outputs. Version control can be used to replicate models. Set the code and schema generation tasks to a temperature of 0. You should keep the prompt and response and every artifact produced by the LLM so you can replicate the reasoning.Data privacy: You are not allowed to send live production datasets to external LLM APIs due to data privacy. Use metadata and small anonymized samples to keep data samples. Use a self-hosted model or a compliant data processing agreement for secure data compartments for sensitive data.Cost and latency at scale: Costs and processing delays are issues. An LLM call can add processing time and costs. Limit LLM use to the initialization of the processing pipeline, schema changes, and error handling. The design above incorporates this.Auditability and governance: Auditability and governance: A common \ question posed by regulators and data governance teams is regarding the reasons for implementing a transformation. As such, prompt and completion logs should be treated like the data lineage of a pipeline and be considered first-class pipeline artifacts. In LLMs' regulated settings, changes made to code by LLMs will always require a human review and sign-off prior to code deployment, just as you would for any code that interacts with sensitive data. What This Means for Data Engineering as a Discipline Data pipeline scenario that consists of LLMs (Large Language Models) will require data engineering skills to build as a data pipeline that consists of LLMs (Large Language Models) will require engineering skills to build around the LLM system. What is the best way to sample data so that the LLM system is able to analyze representative edge case examples in a statistical model? What are the passing criteria for test suites that every LLM (Large Language Model) generated transformation function should have prior to being merged? How is it that good human review gates for schema mapping are established? Given the context of transformation models, what is the meaning of auditability? There are system design issues, not system implementation problems. Implementation is increasingly shifted to the model. The architecture, the engineer's contracts, guardrails, and validation harnesses become more valuable when models take on more boilerplate work. The paradigm shift is not "LLMs replace data engineers." The shift is "LLMs take over the specification layer, and data engineers engineer the frameworks to make LLM outputs safe, auditable, and reliable." Key Takeaways Use LLMs as a semantic coordination layer, not as substitutes for Spark, dbt, or Pandas. Apply them at the four leverage points: schema mapping, code generation, anomaly detection, and documentation.The privacy and cost rationale for this constraint is wholly non-negotiable.The generated code should be treated as an untrusted dependency: validate, test with untrusted sample data, version, and subject to human review before it interacts with any sensitive, regulated, or high-stakes data.Always set temperature=0 and pin model versions for schema and code generation. Treat all prompts and completions as first-class artifacts of the pipeline for logging and audit reproducibility.In this paradigm, the data engineers who design the validation harnesses, review gates, and audit systems that make LLM-generated outputs production safe will be the most valuable.
Most RAG evaluations focus on the answer. Is the answer correct?Does it appear grounded?Did retrieval metrics improve after a pipeline change? Those checks are useful. But they do not tell the full story. Two runs can produce nearly the same answer while relying on different supporting evidence. A small change in chunking, retrieval depth, overlap, or reranking may leave the output looking stable on the surface. Underneath, the cited documents or spans may have changed. Once that happens, reproducibility and auditability become weaker. Another engineer may not be able to reproduce the same support trail. A reviewer may not be able to explain why the system relied on one source in one run and a different source in another. What Gets Missed In many teams, the workflow is simple. Change the retriever. Run the benchmark.Change chunk size. Run the benchmark.Compare answer quality, faithfulness, latency. If the answer still looks fine, the change is treated as safe. But a system can keep giving roughly the same answer while quietly shifting its evidence. That weakens debugging, regression analysis, and traceability. A Simple Example Take an internal HR assistant. A user asks: How many days per week can hybrid employees work remotely? Run A The assistant answers: Hybrid employees may work remotely up to three days per week. It cites: HR_Policy_2024.pdfRemote_Work_FAQ.pdf Run B After a retrieval configuration change, the assistant gives almost the same answer: Hybrid employees may work remotely up to three days per week. But now it cites: Manager_Guidelines.pdfTeam_Handbook.pdf The answer barely moved. But the evidence did. That changes the review question. It is no longer only about whether the answer sounds reasonable. It becomes which source the system treated as authority, and why that changed. Did it move from policy to guidance?Did it move from a formal source to a looser one?Would HR or Legal accept both?Could the team explain the shift if they had to? That is where evidence stability stops feeling academic and starts looking operational. The Subtler Case There is another version of the same problem. Sometimes the system keeps citing the same document, but the cited span inside that document changes. At the document level, it looks stable. At the span level, it may not be stable at all. That matters because two spans from the same document can play very different roles. One may contain the rule. Another may contain context, an exception, or a weaker explanation. A document-level check can say nothing changed while the actual justification shifted in a meaningful way. So, How Do You Check It? At that point, the obvious question is: how do you check this in a repeatable way? That is what pushed me to build RagCiteCheck. It is a post-hoc checker for evidence stability. Feed it retrieval logs from different runs, and it compares what came back at the document level and at the span level when span hashes are available. It is not trying to replace answer scoring or retrieval benchmarking. It answers a narrower question: Did the evidence stay stable across runs? The workflow is small. Run your pipeline a few times.Change one retrieval setting each time.Save what came back.Compare. A simple CLI flow is enough. Validate the logs: Plain Text python -m ragcitecheck.cli validate --runs ./examples/minimal --out ./out_check Generate a document-level report: Plain Text python -m ragcitecheck.cli report --runs ./examples/minimal --out ./out_report_doc --evidence-key doc Generate a document-plus-span-level report: Plain Text python -m ragcitecheck.cli report --runs ./examples/minimal --out ./out_report_span --evidence-key doc_span You get a quick view of: How similar the runs areWhich queries behave inconsistentlyWhere most of the movement is happening What You Start Noticing Once you look at evidence across runs, the same patterns keep showing up. The answer stays similar, but the cited documents changeThe document stays the same, but the cited span changesSome queries remain stable while others keep shiftingSmall retrieval tweaks have larger effects than expected None of that is obvious if you only compare answers. Why It’s Worth Checking If a team says a RAG workflow is reproducible, that should mean more than the answer looked similar again. It should also mean the support trail can be rerun, inspected, and compared. If a team says a RAG system is auditable, that should mean more than the answer came with citations. It should also mean the cited basis does not quietly shift under routine pipeline changes without anyone noticing. That is the value of checking evidence stability. It gives teams a way to inspect changes that would otherwise stay hidden behind stable-looking answers. Final Thought Most teams already compare answers across runs. They should compare the evidence, too. Because sometimes the answer did not change. But the evidence did.
The Problem Most Teams Get Wrong Every data engineer has lived this moment. A dbt model fails at 3 AM. You pull up the logs, see a type conversion error, and start digging through SQL. You check recent commits. Nothing changed. You inspect the upstream data. Nothing looks off. You rerun the job. It passes. You shrug, label it a transient issue, and go back to sleep. Then it happens again two weeks later. I want to talk about a specific category of pipeline failure that burns more engineering hours than almost anything else I've seen. It looks real. It carries a real error message, a real stack trace, a failed model, and a timestamp you can point to in your incident log. But no matter how long you stare at the SQL, you will not find the bug. Because there isn't one. I call these false failures: jobs that break not because your logic is wrong, but because your query contains an implicit assumption that the execution engine has been quietly honoring until the moment it decides not to. What This Actually Looks Like in Practice The pattern becomes obvious once someone points it out to you. A model fails with an error referencing a specific data value. A cast that didn't work. A type mismatch. You investigate and find that the offending value has existed in the table for months. It is not new. It did not arrive in last night's load. You rerun the job without changing anything. It passes. This is not a flaky test. It is not Snowflake having a bad day. It is a determinism problem, and it has a precise mechanical cause. Here's how to spot it: The failure is intermittent. It does not reproduce consistently, even in the same environment with the same data. The error references a value that has been present in the source table for a long time. A retry with zero intervention passes cleanly. And if you bother to pull up the Snowflake query profile, you'll notice the execution plan differs between the failing run and the passing one. That last detail is the key to everything. Why Snowflake Makes This Possible Here is something fundamental about Snowflake that most people working with it never fully internalize: it does not guarantee a consistent execution plan between runs of the same query. Snowflake's optimizer is adaptive. It reassesses strategy at runtime based on conditions that shift constantly. Several things influence which plan it picks: Micro-partition metadata gets updated asynchronously after data loads. The same query issued before and after a stats refresh can follow a meaningfully different path through the data. Warehouse size and concurrency affect parallelism thresholds. What gets broadcast-joined on an XS warehouse may be hash-joined on a Medium. The plan changes because the available compute changed. Data volume growth pushes the optimizer across execution thresholds over time. A strategy that worked at 10 million rows may get abandoned entirely at 500 million. Implicit type coercion is where things get dangerous. When two columns of different types meet in a join condition, Snowflake resolves the mismatch at runtime. Which side gets cast, and at what point during execution, can vary by plan. That last one is where most false failures are born. A Real Example: The Join That Only Breaks on Tuesdays Here's a model I've seen variations of in at least a dozen production pipelines: MySQL SELECT o.*, e.event_type FROM orders o LEFT JOIN events e ON o.order_id = e.event_key Looks harmless. But there's a mismatch hiding in plain sight. orders.order_id is typed as NUMBER. events.event_key is typed as VARCHAR. Snowflake allows this. It resolves the mismatch by casting the VARCHAR side to a number at join time. Since the vast majority of rows in the events table contain numeric-looking keys, this works fine. Almost all the time. But buried somewhere in that events table is a single row where event_key = 'INVALID_VAL'. It has been there for months. Nobody noticed because it never caused a problem. Here's why: on most runs, Snowflake's optimizer prunes away the micro-partition containing that row before the cast is ever attempted. The query completes without incident. The problematic value is never touched. Then one day, the optimizer picks a different plan. Maybe the warehouse was busier. Maybe a stats refresh shifted the pruning boundaries. Maybe the table crossed a size threshold. Whatever the cause, that partition gets scanned first this time. The cast is attempted. And the job dies with: Plain Text Numeric value 'INVALID_VAL' is not recognized Same query. Same data. Same code. The error is real. The bug is not. The Diagnostic Shift That Actually Helps The standard debugging instinct here is exactly wrong. You pull up the SQL. You check git blame. You inspect recent loads. You are anchored to the assumption that a code defect exists, and when it doesn't, you waste hours proving a negative. A better question to ask is: what is this query assuming about how the engine will execute it, and is that assumption guaranteed? When you approach a failing run through that lens, the investigation changes completely. Instead of reviewing business logic line by line, you open the query profile and compare the execution plan between the failing run and the last passing one. You look for differences in operator ordering, join strategies, partition pruning behavior, and the point at which type resolution happens. This reframes the diagnosis from "what broke in my code" to "what changed in how the engine chose to run this." That is a different investigation entirely. And it leads to fixes that actually hold. The Fix: Make the Contract Explicit The instinctive response to an intermittent failure is to add a retry. That solves the alert. It does not solve the problem. Worse, it hides the problem by reducing the frequency of visible failures while the underlying fragility quietly grows. The real fix is eliminating the implicit assumption. In the join example above, that means one line: MySQL -- Before: implicit cast, optimizer decides how and when ON o.order_id = e.event_key -- After: explicit cast, behavior is identical across all plans ON o.order_id::VARCHAR = e.event_key This is a small change with an outsized effect. The query no longer depends on the optimizer choosing a pruning strategy that avoids the bad row. Type resolution is now the query's responsibility, not the engine's. Behavior is consistent regardless of warehouse size, concurrency, or data volume. And here's the part that might feel counterintuitive: this fix will surface the data quality issue consistently. Every run will now encounter that INVALID_VAL row and handle it predictably. If it is genuinely bad data, you want to know about it on every run, not discover it randomly once a quarter when the optimizer happens to scan the wrong partition first. Building Pipelines That Don't Depend on Luck Type coercion in joins is the most common source of false failures, but the principle extends further. Anywhere your SQL relies on implicit behavior, behavior the engine provides by convention rather than by contract, you have a latent failure waiting for the right conditions. A few practices that materially reduce this risk in dbt and Snowflake environments: Cast early and cast explicitly. Use your dbt staging models to lock down column types at the source layer. A staging model that casts event_key::VARCHAR explicitly means every downstream model inherits that contract. No one has to guess. No one has to re-cast. Test join columns at the boundary. Add not_null, accepted_values, or custom schema tests on columns that participate in join conditions. These run before your models execute. They catch data quality problems at the source, not when they surface as cryptic execution-layer errors three models downstream. Treat intermittent failures as debt, not noise. Any job that fails occasionally without a corresponding code change is carrying hidden technical debt. Do not normalize it with retries. Schedule a real investigation. The failure rate will increase over time as data grows and execution plans shift more aggressively. Use the query profile before you use git blame. When a failure cannot be explained by code review, the Snowflake query profile is your next stop. Compare failing and passing runs side by side. If the plans diverge meaningfully, you are almost certainly looking at a false failure. Why This Gets Worse Over Time There is a scaling dimension to this problem that makes it urgent rather than merely interesting. At low data volumes, Snowflake's optimizer tends to be more consistent in its plan selection. The search space is smaller. The pruning decisions are more predictable. As tables grow into the hundreds of millions or billions of rows, execution plans shift more aggressively and more frequently. Thresholds get crossed. Statistics change faster. The optimizer explores more alternatives. Every implicit assumption that has been quietly tolerated at small scale becomes increasingly likely to be exposed at large scale. This means the pipeline that "works fine" today with a 2% intermittent failure rate will not stay at 2%. It will drift upward as your data grows, and by the time it becomes a serious operational problem, you will have dozens of models carrying the same class of hidden assumption. The Mental Model Worth Adopting Write your SQL as if the optimizer will always find the most inconvenient execution path. Assume it will scan the partition you hoped it would skip. Assume it will cast the side you didn't expect. Assume the plan will change tomorrow. If your query breaks under those assumptions, the query needs to be more explicit. Not the optimizer more predictable. A query that works because the optimizer happens to avoid a problematic data path is not a correct query. It is a lucky one. And luck is not an engineering strategy. The Takeaway False failures in dbt and Snowflake pipelines are not random. They are not gremlins. They are the predictable result of implicit assumptions meeting a dynamic execution engine that satisfies those assumptions by coincidence rather than by obligation. Recognizing this pattern, and separating it from genuine code bugs, is one of the most valuable diagnostic skills you can develop working in modern cloud data environments. Next time your pipeline fails and the code looks clean: stop auditing the logic. Start auditing the assumptions. Find what your query relies on implicitly. Make it explicit. Build tests that enforce it at the data layer, before the execution engine ever gets the chance to surprise you. Your code was fine. Your contract with the engine wasn't. Now you know the difference.
The Pipeline Did Not Fail Cleanly Most pipeline failures don't look like "the job failed." Consider a common scenario. A Glue job reads overnight event files, applies business rules, and writes to an Iceberg curated table. The job runs at its scheduled time and errors out partway through. The control table shows SUCCESS for the previous batch and FAILED for the current one, which is what you'd expect. The problem is what happened between those two states: the job wrote nine of the day's twelve partitions to the staging table before failing. A downstream report ran on its own schedule, picked up the partial data, and the discrepancy didn't surface until a downstream consumer noticed records were missing. By the time someone looks at the failure, the question is no longer "Why did the job fail?" It's "Is it safe to rerun, and what's already corrupted downstream?" That's where debugging gets messy. CloudWatch logs, Glue run metadata, the source S3 path, record counts, data quality results, target table state, and Iceberg snapshots. An experienced engineer can connect those signals, but it takes time, and a less experienced engineer often misses one. In a busy production environment that delay leads to blind reruns, duplicate records, overwritten partitions, or worse. The frustrating part is that the evidence existed. The pipeline just had no structured way to explain itself. That's the gap a triage layer can fill. Not by fixing the pipeline. Not by changing schemas. Not by restarting jobs. By observing the evidence already produced, classifying the failure, explaining what likely happened, and recommending what to do next. What Agentic Observability Means The word "agentic" gets misused a lot right now, especially in data engineering. It's worth being precise. An agentic observability layer is not an LLM with permission to control production. It's a controlled workflow that collects pipeline evidence, builds incident context, classifies the failure against known categories, and produces a structured recommendation. The loop is observe, classify, explain, recommend, and that's where it stops. Everything past "recommend" stays with engineers, deterministic rules, or approval workflows. The difference from normal alerting is the depth of the output. A normal alert says "Glue job daily_customer_interactions failed." An agentic observability layer should produce something closer to: "The job failed because the input contains a new column not present in the curated schema. The staging write started before the failure, so a blind retry will create duplicate records. Quarantine the batch, review the schema contract, and rerun with the same batch_id after validation." That difference is what saves time during an incident. The goal isn't replacing engineers. It's reducing the manual triage work needed before someone can make a real decision. Reference Architecture This does not need to start as a new platform. The triage layer can sit beside existing Glue pipelines and consume signals that already exist. Figure 1. Agentic observability flow for AWS Glue pipelines. Pipeline evidence is collected, converted into structured context, analyzed by an LLM triage layer, and returned as a structured incident output. The component that matters most here is the incident context builder. The LLM should never receive a raw dump of ten thousand log lines. That produces noisy, low-confidence output and burns tokens. The collector should pull a curated set of signals: Glue job name and run ID, status and duration, batch ID, source path, target table, the last fifty error log lines, data quality results, record counts, attempt count, recent deployment version, table snapshot or commit ID, and control table status. That's enough context to analyze the failure without guessing from disconnected log lines. Where This Fits Before going further, one thing worth being honest about: this pattern depends on the platform already having its house in order. The agent can only work with the observability that the platform already has. It is not a substitute for basic pipeline hygiene. It works when the platform tracks batch IDs, clear source paths, data quality results, structured logs, table commits, deployment versions, and ownership mapping. Without those signals, the agent has very little to reason over. If a pipeline doesn't track batch IDs, the agent can't reliably tell whether a run is a retry or a new batch. If quality results aren't stored, it can't reason about input validity. If table commits aren't tracked, it can't tell whether the failure happened before or after a write. LLMs don't create observability. They summarize and reason over the observability that already exists. The teams that get the most out of this pattern are the ones with disciplined data engineering underneath. Failure Categories Manual debugging takes time, partly because every failure looks unique at first glance. Most don't stay unique once you classify them. A small fixed set of categories makes the output easier to review, compare, and route. Failure categoryCommon signalsRecommended actionSchema driftNew column, missing column, cast failure, contract mismatchQuarantine the batch and review the schema contractData skewLong-running tasks, shuffle spill, uneven partitionsRepartition or isolate skewed keysSmall file pressureHigh file count, slow planning, frequent commitsCompact affected partitionsSource delayMissing input path, low record count, late file arrivalWait, retry later, or mark the batch delayedCode regressionRecent deployment plus transformation errorRoll back or compare with the previous runPermission issueAccess denied, catalog failure, IAM or Lake Formation errorFix access policy before retryingPartial write riskFailure after write startedCheck staging and control tables before rerunUnknownWeak or conflicting evidenceEscalate to an engineer with summarized context The category list isn't only documentation. It's part of the system contract. The agent picks from this list rather than inventing categories on each run, which makes downstream routing tractable. Schema drift can go to the data contract owner. Permission issues route to the platform team. Source delays go to the ingestion owner. Partial write risk triggers a manual review workflow rather than auto-retry. This is what makes the system more useful than a chatbot that summarizes logs. Structured Incident Output The output should also be structured. Free-form summaries help humans skim, but they're hard to store, compare, or evaluate over time. JSON works better because it can be written to an incident table and consumed by Slack, Teams, Jira, or ServiceNow without parsing prose. JSON { "pipeline_name": "daily_customer_interactions", "job_run_id": "jr_2026_05_02_001", "status": "FAILED", "failure_category": "SCHEMA_DRIFT", "likely_root_cause": "Input file contains a new column named device_type that is not defined in the curated table schema.", "affected_source_path": "s3://raw/events/date=2026-05-02/", "affected_table": "curated.customer_interactions", "safe_to_retry": false, "recommended_action": "Quarantine the batch, update the schema contract, and rerun with the same batch_id after validation.", "confidence": 0.87 } A structured output gives engineers a quick summary, and it gives downstream tools something reliable to use. If safe_to_retry is false, the orchestrator blocks automatic retry. If failure_category is PERMISSION_ERROR, the issue routes to the platform queue. If confidence is low, the system asks for human review. If the same failure category recurs across runs, dashboards can track it over time. One important framing point: the LLM is not the system of record. The control table, logs, table metadata, and quality checks remain the source of truth. The agent is a reasoning layer that produces structured evidence on top of that. Implementation Sketch A simple implementation starts with assembling the incident context. The example below is intentionally simplified. In production, the LLM call should use structured outputs or schema-validated responses rather than free-form text parsing. Python def build_incident_context(job_run, control_record, dq_results, recent_logs): return { "job_name": job_run["JobName"], "job_run_id": job_run["Id"], "status": job_run["JobRunState"], "started_on": str(job_run["StartedOn"]), "completed_on": str(job_run.get("CompletedOn")), "batch_id": control_record.get("batch_id"), "source_path": control_record.get("source_path"), "target_table": control_record.get("target_table"), "attempt_count": control_record.get("attempt_count"), "control_status": control_record.get("status"), "data_quality_results": dq_results, "recent_error_logs": recent_logs[-50:] } The classifier receives a fixed category list and explicit rules about what it shouldn't recommend. Python def classify_failure(llm_client, incident_context): prompt = f""" You are analyzing a failed data pipeline run. Classify the failure into one of these categories: SCHEMA_DRIFT, DATA_SKEW, SOURCE_DELAY, PERMISSION_ERROR, CODE_REGRESSION, PARTIAL_WRITE_RISK, SMALL_FILE_PRESSURE, UNKNOWN. Return only valid JSON with: failure_category, likely_root_cause, safe_to_retry, recommended_action, confidence. Rules: - Do not recommend a retry if there is partial write risk. - Do not recommend schema changes without human review. - Do not recommend permission changes without platform approval. - Use UNKNOWN when evidence is weak or conflicting. Incident context: {incident_context} """ return llm_client.invoke(prompt) In a real implementation, this prompt should be paired with a strict response schema (failure_category as an enum, likely_root_cause as a string, safe_to_retry as a boolean, recommended_action as a string, confidence as a float between 0 and 1), and the system should reject any output that doesn't match. In production, structured outputs are the better choice when the API supports them. The free-form prompt above is illustrative. The result gets stored, not acted on: Python def store_incident_summary(summary, incident_table): incident_table.put_item( Item={ "pipeline_name": summary["pipeline_name"], "job_run_id": summary["job_run_id"], "failure_category": summary["failure_category"], "safe_to_retry": summary["safe_to_retry"], "recommended_action": summary["recommended_action"], "confidence": summary["confidence"], "created_at": current_timestamp() } ) The agent writes an explanation. Other systems decide what to do with it. What the Agent Should Never Decide This boundary is the most important design choice in the whole pattern, and it's worth being explicit about. An observability agent helps engineers understand a failure. It does not control production data systems. Even at high confidence, certain actions stay out of scope: Changing table schemasGranting IAM or Lake Formation permissionsDeleting dataMarking a partially written batch as successfulOverriding data quality failuresPromoting quarantined dataRewriting production tablesTriggering cross-pipeline backfillsCompacting or expiring table snapshots without approval These actions move from observability into production control, and that line should stay clear. In regulated or business-critical environments, the safest design lets the agent produce structured evidence and recommendations while deterministic rules, approval workflows, or engineers decide whether anything actually executes. An agent saying "this looks like schema drift, the batch is not safe to retry" is useful. The same agent updating the curated table schema on its own is not. It's a future incident waiting to happen. Same with permissions: the agent flagging an IAM issue is useful; the agent granting itself access is a security violation. The trade-off here is real. Letting the agent take action would reduce the mean time to recovery. But the cost of a confident wrong action (silently corrupted data, an unauthorized permission grant, a dropped partition) is much higher than the cost of a few extra minutes of human review. In a regulated data environment, that trade-off is usually easy to justify. This matters as teams move toward self-healing pipelines. Before a pipeline can safely fix itself, it has to first explain itself reliably, at scale, with measurable accuracy. That bar isn't met yet in most production environments. Evaluating the Triage Layer A triage layer should be evaluated like any other production component. "The summary looks good" is not an evaluation. To check whether the pattern behaves reasonably, a small synthetic evaluation can be assembled across common Glue failure modes. Each scenario includes a short set of log lines, control-table state, data quality results, and table metadata, and the agent is scored on two things: whether it picks the correct failure category, and whether the safe_to_retry decision is appropriate. This is a starter evaluation, not a benchmark. Ten synthetic scenarios are enough to sanity-check the design. A real production rollout needs hundreds of labeled historical incidents, edge cases, and human-reviewed outcomes. Anything less should be treated as an early prototype, not production validation. ScenarioExpected categoryAgent categorySafe-to-retry decisionMissing source pathSOURCE_DELAYSOURCE_DELAYCorrectNew column in inputSCHEMA_DRIFTSCHEMA_DRIFTCorrectAccess denied on catalog tablePERMISSION_ERRORPERMISSION_ERRORCorrectShuffle spill and one long taskDATA_SKEWDATA_SKEWCorrectFailure after staging writePARTIAL_WRITE_RISKPARTIAL_WRITE_RISKCorrectToo many small filesSMALL_FILE_PRESSURESMALL_FILE_PRESSURECorrectRecent code deployment plus null pointerCODE_REGRESSIONCODE_REGRESSIONCorrectLow record count, no hard errorSOURCE_DELAYUNKNOWNConservative escalationCast failure due to bad input valueSCHEMA_DRIFTSCHEMA_DRIFTWrong, recommended retryConflicting log signalsUNKNOWNUNKNOWNCorrect escalation In a small evaluation like this one, a well-designed classifier should pick the expected category in most scenarios and, more importantly, get the safe-to-retry decision right in nearly all of them. The illustrative results above show eight correct retry decisions, one conservative escalation (the agent returns UNKNOWN rather than guessing), and one wrong call. That wrong call is the most instructive. On the cast failure, the agent classifies the issue correctly as schema drift but recommends cleanup-and-retry instead of quarantine-and-contract-review. A wrong root cause is inconvenient. A wrong retry recommendation can corrupt data. Safe-retry precision should be weighted higher than classification accuracy when evaluating this kind of system, and that weighting should be reflected in the prompt rules and in the validation rubric. The metrics worth tracking in production: MetricWhy it mattersClassification accuracyWhether the agent identifies the right failure typeSafe-retry precisionWhether retry recommendations are actually safeFalse confidence rateConfident-but-wrong recommendationsMean triage timeReduction in manual debugging timeHuman override rateHow often engineers reject the recommendationCost per incidentLLM and log-processing cost per failed run False confidence rate deserves attention. A low-confidence wrong answer is manageable because engineers know to scrutinize it. A high-confidence wrong answer is dangerous because teams stop scrutinizing. Confidence belongs in the output, but it should never be treated as truth. It's one signal among several in the routing decision. Closing Glue job failures aren't hard because the logs are long. They're hard because the evidence is scattered across logs, run metadata, data quality results, control tables, and table commits, and an engineer has to assemble it before deciding what to do next. An agentic observability layer turns that scattered evidence into a structured incident summary. The safest version of this pattern is controlled triage, not autonomous repair: observe, classify, explain, recommend, and stop there. Deterministic rules, approval workflows, and engineers decide what happens next. Before pipelines can fix themselves, they need to explain themselves. That's the work worth doing first.
June 3, 2026 by
Getting Started With Agentic Workflows in Java and Quarkus
June 3, 2026 by
How to Save Money Using Custom LLMs for Specific Tasks
June 3, 2026 by
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1
June 3, 2026 by
June 3, 2026 by
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1
June 3, 2026 by
June 3, 2026 by
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1
June 3, 2026 by
How to Save Money Using Custom LLMs for Specific Tasks
June 3, 2026 by
Getting Started With Agentic Workflows in Java and Quarkus
June 3, 2026 by
Build a GitHub Slack Bot With AWS Bedrock and MCP, Part 1
June 3, 2026 by