DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Big Data

Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.

icon
Latest Premium Content
Trend Report
Cognitive Databases, Intelligent Data
Cognitive Databases, Intelligent Data
Refcard #269
Getting Started With Data Quality
Getting Started With Data Quality
Refcard #254
Apache Kafka Essentials
Apache Kafka Essentials

DZone's Featured Big Data Resources

The Real-Time Revolution: Why Blockchain Needs Data Stream Processing

The Real-Time Revolution: Why Blockchain Needs Data Stream Processing

By Gautam Goswami DZone Core CORE
Blockchain is an extremely data-driven technology because its primary function is to store, verify, and coordinate independent records in a secure, distributed data network. Without this information, no transaction, smart contract execution, or network activity would be valid, and it could jeopardize the integrity of much larger functions of trust. The data coming into the blockchain affects the accuracy of the whole system. Blockchain is nothing without the data it connects to, so, as far as transparency, immutability, and safe decisions are concerned, data is the backbone of blockchain. Blockchain and data streaming are bringing unprecedented levels of security, transparency, and real-time mechanisms to move data across the digital world. Blockchain forms an unbreakable chain of trust through keeping decentralized records, and streaming data streamlines the process by allowing for insights when information is constantly flowing. These form the backbone of next-generation applications, unleashing innovation, scalability, and better decision-making across industries. Both blockchain and data streaming are independently large and powerful technologies as they exist in the present time. However, when combined, data streaming can amplify the potential impact of a blockchain solution. Real-Time Data Integration Data streaming platforms, such as Apache Kafka and Apache Flink, continuously process and deliver real-time data. When we integrate with blockchain, transactions can be updated instantly on the ledger, smart contracts can react to live data feeds, and delays can be reduced compared to batch processing. For example, we can visualize it as the IoT sensors streaming temperature data can trigger a blockchain-based smart contract in real time. Improved Scalability One major limitation of blockchain systems like Ethereum has been scalability. By leveraging data streaming, we can pre-process and filter large volumes of data before sending it to the blockchain. Can reduce unnecessary transactions that are stored on-chain, and, on top of that, offload heavy computation on-chain and push it to a stream processing engine that is available on data streaming platforms.This results in faster and more efficient blockchain performance. Enhanced Data Integrity and Trust Blockchain ensures immutability and transparency; on the other hand, data streaming ensures continuous data flow. As data stream processing enables continuous validation, filtering, and analysis of data elements before they are processed on the ledger, it enhances data integrity and trust in blockchain. Real-time processing helps identify anomalies in the data, prevent tampering, and ensure that only accurate, high-quality data enters the blockchain. Combining this provides a trusted, secure, and eventually transparent ecosystem in which information can be verified instantly and with confidence. We can consider a use case of supply chain tracking where real-time shipment data is streamed and permanently recorded. Better Event-Driven Architectures Blockchain systems can become more dynamic when combined with an event-driven streaming platform such as Confluent, Amazon Kinesis, or the open-source Apache Kafka. Smart contracts can act as automated responders to streamed events and can be enabled for automation across distributed systems, which finally reduces manual intervention. For example, a payment is automatically released when a delivery event is streamed and confirmed. Efficient Data Storage Strategy Not all data needs to be stored on-chain, which is expensive and slow, but by leveraging streaming platforms, we can store and process high-volume data off-chain. Streaming platforms can be integrated with streaming databases to store data already processed by stream engines. We can allow the Blockchain to store only critical summaries, hashes, or proofs, maintaining efficiency while ensuring verification. Real-Time Analytics and Monitoring Data stream processing facilitates real-time analytics and monitoring in blockchain by analyzing transaction data as it streams over the network. This enables organizations to detect suspicious activity, monitor system performance, and obtain real-time information on blockchain activity by analyzing transaction patterns. Transparency, responsiveness, and operational efficiency across blockchain ecosystems can be upgraded if we convert the raw data into actionable intelligence by integrating a real-time stream processing platform. Wrapping Up Combining these two technologies — data stream processing and blockchain — creates an ecosystem that blends real-time intelligence with secure, immutable record-keeping. Blockchain ensures transparency, trust, and data integrity, while stream processing powers instant analysis, continuous monitoring, and real-time decision-making based on that data. When combined, they improve power efficiency, enhance security, and enable scalable, data-driven applications. These technologies play an instrumental role in the construction of smarter, more intelligent systems that must respond to increase confidence among organizations relying on that real-time information. More
Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot

Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot

By Erkin Karanlık
Managing high-volume message traffic in distributed architectures is crucial. Efficient use of database and CPU resources is also very important. There are structures that allow us to receive messages in batches. The default Spring Kafka "BatchMessageListener" structure addresses this need. However, the processing of these messages often goes through a sequential bottleneck. This article will discuss the structure and usage of Kotlin Coroutines in detail. We will examine how to maximize Kafka message processing performance using Structured Concurrency principles and Resource Throttling techniques. Architectural Bottleneck: Sequential I/O Blocking On the current Kafka listener: Database or external service calls made for each message directly increase total processing times. If the processing speed of a message lags behind the message arrival speed and the max-poll-interval-ms time is exceeded, the consumer is removed from the consumer group. Rebalancing is triggered, and the partitions of that consumer are redistributed to other consumers in the group. Kotlin @KafkaListener(topics = ["usage-pool-topic"]) fun usagePoolListener(records: List<ConsumerRecord<String, String>>) { records.forEach { record -> processRecord(record) // Network latency + DB I/O blocking } } Solution 1. Batch-Fetch and In-Memory Map Structure Before any concurrent code is entered, data is retrieved collectively from all necessary entities. Multiple separate queries are converted into a batch query before data processing begins. The N+1 query problem is solved at the application layer. All data is cached once before being broken down into concurrent operations. Having the data cached significantly reduces our reliance on the database. Using the associateBy function, we transform the data into a map structure with X access times. This allows us to read the data safely from the maps instead of reading each concurrent operation from the database. Kotlin val messages = records.map { objectMapper.readValue(it.value(), UsagePoolRecord::class.java) } val usagePoolEntities = usagePoolRepository .findByIds(messages.map { it.usagePoolId.toBigInteger() }) .associateBy { it.usagePoolId } val lockEntities = lockRepository .findByUserIds(messages.map { it.userId }) .associateBy { it.userId } 2. Structured Concurrency Memory Management With Chunking The chunk structure serves two purposes. It prevents the creation of coroutines simultaneously. This prevents unnecessary memory usage. Each chunk writes to the database after all coroutines have completed their operations. Unnecessary connection pool consumption is avoided. Kotlin messages.chunked(150).forEach { chunk -> // Each chunk of 150 records is processed concurrently } Resource Isolation With limitedParallelism Why limitedParallelism? If the database connection pool has, for example, X connections, keeping the parallelism limit below X prevents "Connection Timeout" errors. Kotlin messages.chunked(150).forEach { chunk -> val deferredResults = chunk.map { record -> CoroutineScope(Dispatchers.IO.limitedParallelism(15)).async { try { processRecord(record, usagePoolEntities, lockEntities) } catch (e: Exception) { log.error("Operation error: ${record.key()}", e) buildErrorRecord(record, e) } } } val results = deferredResults.awaitAll() // Structural waiting collectAndAggregate(results) } The Dispatchers.IO.limitedParallelism(X) command limits the number of concurrent coroutines to X, preventing the DB connection pool from being exhausted.Each coroutine returns a result with the async command. The awaitAll() command waits for all coroutines in the chunk to finish before proceeding to the next step. runBlocking This function blocks callers until all concurrent operations are complete. This is the correct approach here because: It ensures that the Kafka consumer remains blocked to maintain its offset commit structure until all records in the batch are processed. We still benefit from concurrent operation parallelism within the runBlocking block. 3. Thread-Safe Result Structure After the awaitAll() operation, all results are collected in thread-safe queues. Then a single batch write operation takes place. Using MutableList structures to combine results returned from parallel processed coroutines can lead to data loss. At this point, lock-free data structures should be preferred. ConcurrentLinkedQueue uses CAS (Compare-And-Swap) algorithms instead of synchronized blocks. This provides superior performance in high-content write operations. Why Shouldn't We Use ConcurrentLinkedQueue? Concurrent operations (concurrent functions) perform simultaneous write operations to a shared collection of results. Using MutableList leads to race conditions. It performs well in secure and concurrent write operations. Kotlin data class AggregatedRecords( val processedSave: ConcurrentLinkedQueue<ProcessedEntity> = ConcurrentLinkedQueue(), val toDelete: ConcurrentLinkedQueue<UsagePoolEntity> = ConcurrentLinkedQueue(), val retryQueue: ConcurrentLinkedQueue<RetryEntity> = ConcurrentLinkedQueue() ) The DataIntegrityViolationException return is important. When two consumer instances are processing the same record, one of them falls into a unique constraint violation. Instead of making the entire batch fail, record-by-record deletion is performed. Kotlin AggregatedRecords.processedSave .chunked(150) .forEach { batch -> try { processedRepository.saveAll(batch) } catch (e: DataIntegrityViolationException) { batch.forEach { record -> try { processedRepository.save(record) } catch (e: DataIntegrityViolationException) {} } } } 4. Error Tolerance in Write Operations Batch write (saveAll) operations are performant. However, a "Unique Constraint" error in a single record can cause the entire batch to fail. The following structure is critical to meet Optimistic Locking or Idempotency requirements. Kotlin aggregatedRecords.processedSave.chunked(150).forEach { batch -> try { processedRepository.saveAll(batch) } catch (e: DataIntegrityViolationException) { // Fallback: Try one by one if batch fails batch.forEach { record -> try { processedRepository.save(record) } catch (innerException: DataIntegrityViolationException) { log.warn("Duplicate record skipped: ${record.id}") } } } } 5. Data Flow Diagram Ingress: The Kafka batch is caught with runBlocking.Preparation: All necessary context data is retrieved bulk from the DB.Execution: Coroutines are started asynchronously in chunks.Synchronization: The completion of all coroutines is awaited as a barrier point with awaitAll().Egress: Collected results are made permanent with saveAll. Performance Analysis and Results Conclusion Processing Kafka messages in Spring Boot with Kotlin Coroutines not only increases speed but also improves code readability and makes resource management deterministic (predictable). The use of runBlocking allows us to build a bridge between the blocking Kafka consumer thread and the suspended world without disrupting Kafka's offset management mechanism. Dependencies XML <dependency> <groupId>org.jetbrains.kotlinx</groupId> <artifactId>kotlinx-coroutines-core</artifactId> <version>1.7.3</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> More
From ETL to Lakeflow: Shifting to a Declarative Data Paradigm
From ETL to Lakeflow: Shifting to a Declarative Data Paradigm
By Seshendranath Balla Venkata
Rust-Native Alternatives to Spark SQL and DataFrame Workloads
Rust-Native Alternatives to Spark SQL and DataFrame Workloads
By Srinivasarao Rayankula
Combining Temporal and Kafka for Resilient Distributed Systems
Combining Temporal and Kafka for Resilient Distributed Systems
By Akhil Madineni
The Big Data Architecture Blueprint: Core Storage, Integration, and Governance Patterns
The Big Data Architecture Blueprint: Core Storage, Integration, and Governance Patterns

Building scalable data systems often feels like navigating an endless sea of shifting paradigms. Engineers and architects are constantly forced to choose between centralizing data or distributing it, processing in batches or streaming in real time, and enforcing strict compliance or enabling rapid self-service analytics. Without a structured taxonomy, engineering teams risk building fragmented pipelines that accumulate technical debt. The following comprehensive blueprint serves as a definitive Data Patterns and Practices Library to help you align your infrastructure with proven engineering methodologies. Architectural Patterns Data lake: A centralized repository that allows storing structured and unstructured data at any scale, enabling raw data storage for various analytics purposes.Data warehouse: A large, centralized repository for storing and managing structured data, optimized for high-performance analytics and reporting.Lambda architecture: A data processing architecture that combines batch and stream processing for fault-tolerant, scalable, and real-time data analytics.Kappa architecture: A data processing architecture that simplifies Lambda Architecture by only using stream processing for both real-time and historical data.Microservices architecture: A design approach that structures applications as a collection of small, independently deployable services, allowing for greater flexibility and scalability.Event-driven architecture: A software design pattern that promotes the production, detection, and reaction to events, enabling loose coupling and high scalability in distributed systems.Polyglot persistence architecture: A data storage strategy that uses multiple types of databases to store and manage data according to its specific needs.Data mesh: A decentralized approach to data architecture focusing on domain-oriented data ownership, self-serve data infrastructure, and product-oriented data delivery.Data vault: A hybrid data modeling and storage methodology that combines aspects of 3NF and star schema to create a scalable, flexible, and auditable solution.Streaming-first: An approach that prioritizes real-time data processing and analysis utilizing event streaming technologies. Storage Patterns Sharding: A method of distributing data across multiple database servers to improve performance and scalability.Partitioning: The process of dividing a large table into smaller, more manageable pieces to improve query performance.Replication: The process of copying data from one database to another to ensure availability, redundancy, and load balancing.Federated storage: A storage architecture that integrates multiple storage systems under a unified management framework.Object storage: A scalable architecture that manages data as objects rather than files or blocks, providing high performance for unstructured data.Columnar storage: A format that stores data by column rather than row, which is particularly suited for analytics workloads.Time-series: A specialized storage system designed to handle time-stamped data, such as sensor data or stock prices, efficiently.Graph storage: A system optimized for storing and querying graph data, representing entities and their relationships in an interconnected structure.In-memory storage: A storage architecture that stores data in RAM instead of on disk for significantly faster processing.Hybrid storage: A solution that combines different storage types, such as on-premises and cloud, to optimize cost and performance. Integration Patterns Extract, transform, load (ETL): A process of extracting data from source systems, transforming it, and loading it into a target system.Extract, load, transform (ELT): A variation of ETL where data is first loaded into the target system and then transformed using the target's processing power.Change data capture (CDC): A technique for capturing and processing changes in source data to enable incremental updates to target systems.Data federation: A technique for integrating data from disparate sources without physically moving or copying it, providing a unified view.Data visualization: An approach that abstracts underlying data sources, allowing users to access and manipulate data without knowing its physical location.Data replication: The process of copying data from one database to another to ensure data availability and redundancy.Data synchronization: The process of keeping data in multiple locations consistent and up-to-date by propagating changes.Data preparation: The process of cleaning, transforming, and enriching data to make it suitable for analysis or processing.Publish/subscribe: A messaging pattern that decouples data producers and consumers using an intermediary message broker.Request/reply pattern: A messaging pattern where a data consumer sends a request and waits for a response, allowing for synchronous communication. Data Analytics Descriptive analytics: The analysis of historical data to understand past events and trends, often presented through reports or dashboards.Diagnostic analytics: The process of examining data to determine the causes of past events using techniques like data mining or correlations.Predictive analytics: The use of data, statistical algorithms, and machine learning to predict future events based on historical data.Prescriptive analytics: The process of recommending actions or decisions based on data analysis using optimization or simulation algorithms.Real-time analytics: The analysis of data as it is generated or received to provide immediate insights and rapid decision-making.Batch analytics: The processing and analysis of large volumes of data in batches, often scheduled at regular intervals.Text analytics: The process of extracting meaningful information from unstructured text using natural language processing.Geospatial analytics: The analysis of geographically referenced data to interpret spatial relationships and patterns.Sentiment analytics: A technique using NLP to determine the sentiment or emotion expressed in textual data.Network analytics: The analysis of network data to uncover patterns and interactions between nodes (entities) in a network. Data Management Master data management (MDM): The process of creating a single, authoritative source of truth for critical business data.Reference data management (RDM): The practice of managing shared data (like codes or categories) used across multiple systems for consistency.Metadata management: The process of creating and maintaining data about data to facilitate discovery and governance.Data catalog: A searchable inventory of an organization's data assets, including datasets and reports.Data lineage: The practice of tracking the flow of data through systems, including its origin and transformations.Data versioning: The process of tracking and managing changes to data over time for recovery and auditing.Data performance: The process of documenting the origin, history, and processing of data to ensure trustworthiness and traceability.Data lifecycle management: A comprehensive approach to managing data from creation to archival or deletion.Data virtualization: A technique that abstracts underlying data sources to allow access without knowledge of physical location or structure.Data profiling: The process of assessing data quality by collecting statistics and identifying patterns or anomalies. Data Governance Data stewardship: The practice of overseeing an organization's data to ensure quality, consistency, and compliance.Data quality management: The process of measuring and improving the accuracy, completeness, and consistency of data.Data policy management: The development and enforcement of standards and procedures that govern data use.Data classification: The process of categorizing data based on sensitivity or risk to implement appropriate security measures.Data retention and archival: Defining policies for storing and disposing of data based on legal and business requirements.Data privacy compliance: Ensuring data practices adhere to laws and regulations like GDPR or CCPA.Data lineage and provenance: Tracking the origin and flow of data through systems to ensure accuracy and compliance.Data cataloging and discovery: Maintaining a searchable repository that provides an inventory of an organization's data assets.Data risk management: Identifying and mitigating data-related risks such as breaches or corruption.Data ownership: Assigning accountability for data assets to specific individuals or teams to ensure proper management. Data Security Data encryption: Encoding data to protect it from unauthorized access both at-rest and in-transit.Data masking: Obscuring sensitive data by replacing it with fictitious data to prevent exposure to unauthorized users.Data tokenization: Substituting sensitive data with non-sensitive tokens while still enabling some operations and analytics.Data access control: Defining policies that determine who can access or modify data based on roles and security requirements.Data auditing: Monitoring and recording data activities to detect unauthorized access or compliance violations.Data anonymization: Removing personally identifiable information (PII) from datasets to protect individual privacy.Data pseudonymization: Replacing sensitive data with artificial identifiers to reduce re-identification risk.Data security monitoring: Continuously analyzing systems and networks for potential security threats or breaches.Data activity monitoring: Continuous analysis of database transactions to detect unauthorized access or policy violations.Data loss prevention: Tools and practices designed to protect sensitive data from unauthorized leakage or theft. Key Use Cases and Architectural Examples 1. Real-Time Distributed Processing for High-Velocity Streams For platforms requiring immediate analytical insights, minimizing architectural complexity while handling large-scale data streams is a primary challenge. Core patterns: Kappa Architecture, Streaming-first, and In-Memory Storage. Production tech stack: Apache Kafka, PySpark, Structured Streaming, and Redis. Specific example: In a high-volume financial transaction system, implementing a Kappa Architecture simplifies the processing pipeline by routing both real-time logs and historical data events through a single stream engine. By prioritizing a streaming-first approach using an Apache Kafka cluster, the platform eliminates the complex dual-pipeline maintenance found in traditional Lambda setups. A PySpark Structured Streaming application consumes these event streams directly, executing stateful window transformations on the fly. To achieve microsecond latency for immediate fraud lookups, the working state or frequently queried reference tables are held in an In-Memory Storage layer like Redis, ensuring rapid access speeds that disk-based alternatives cannot match. 2. Decentralized Architecture for Enterprise Scaling Large organizations often face engineering bottlenecks when a single, centralized team manages a massive monolithic data lake. Core patterns: Data Mesh, Data Governance, and Data Cataloging and Discovery. Production tech stack: Databricks Unity Catalog, AWS Lake Formation, and Snowflake Data Sharing. Specific example: A multi-national banking entity transitions to a Data Mesh framework, shifting data asset ownership away from a centralized team to domain-oriented groups, such as Risk Modeling and Retail Analytics, which deliver data as independent products. To maintain unified compliance, the infrastructure relies on strict Data Governance policies managed through Databricks Unity Catalog and AWS Lake Formation, enforcing centralized data stewardship, role-based access control, and automated data classification. These localized datasets are then securely exposed across departments via Snowflake Data Sharing. A centralized Data Catalog runs continuously on top of these endpoints, providing developers across the entire enterprise a single, searchable inventory to securely discover, audit, and consume cross-domain data products. 3. High-Performance Cloud Analytics and Reporting To optimize modern cloud infrastructure, data pipelines must maximize query performance while containing compute and storage costs. Core patterns: Extract, Load, Transform (ELT) and Columnar Storage. Production tech stack: dbt (Data Build Tool), Delta Lake, Snowflake, and Apache Spark. Specific example: A modern enterprise analytics platform ingests massive volumes of raw operational data into cloud object storage, choosing a flexible ELT pipeline over traditional ETL frameworks. Raw files are loaded directly into a target data platform like Snowflake or Databricks Delta Lake, leveraging cloud elasticity to execute complex transformations post-load using dbt or optimized Spark SQL queries. To maximize business intelligence performance, the underlying files are stored using highly optimized Columnar Storage formats like Parquet. This structures data by column rather than row, ensuring that analytical queries only read the specific columns requested for a report. This optimization cuts down disk I/O operations and speeds up complex calculations across billions of historical records. Conclusion Successfully implementing a modern data infrastructure is never about finding a single pattern to solve every corporate challenge. True architectural maturity lies in knowing how to weave these paradigms together. By mapping tactical storage choices directly to overarching governance and integration frameworks, software architects can build resilient environments capable of evolving alongside business demands. Which of these three architectural focus areas aligns best with your specific narrative or current production environment? Let me know in the comments below.

By Ram Ghadiyaram DZone Core CORE
Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question
Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question

Architectural Debate There is a classic debate that data architects often have among themselves: how to fit a traditional data warehouse on a data lake or enterprise data platform. This article walks through the architecture evolution and describes three architecture patterns that I have implemented across enterprises to help you decide where a data warehouse fits in a modern data platform. The data warehouse acted as a single source of truth that finance, retail, and operations teams could trust for day-to-day reporting. Appliance warehouses like Teradata, Netezza, and SybaseIQ dominated enterprise data for decades, and SQL was the universal language that held it all together. Then two things happened simultaneously. Data volumes outgrew what any single warehouse could handle, and cloud storage made storing everything cheaper. This created a genuine architectural dilemma that most organizations have not resolved cleanly (yet). The most common mistake I have seen is the “one size fits all” approach, where workloads are mixed together without considering the purpose or usage, based on people and “rigid” processes. This creates a cost overhead over time and significantly limits the ability to get real value from the data. After implementing data platforms at companies, including a large US beverage manufacturer, a global media company, and as part of the AWS data architecture practices, I have seen three distinct patterns emerge. Each has a legitimate use case. Each has a failure mode that is entirely predictable, and yet organizations keep hitting it. Context: What the Warehouse Was Actually Good At Before evaluating patterns, it helps to be precise about what warehouses solved: Transformations. CDC from transactional sources, slowly changing dimensions(SCD), and fact aggregation. These are the use cases that SQL warehouses solved reliably, either with SQL and/or by ETL tooling.Reporting. Pre-computed, governed, low-latency access for drill-down/roll-up, and summarization. Data lakes historically struggled with both. ACID compliance was unreliable, and complex transformations required significant engineering. Open table formats like Apache Iceberg, Apache Hudi, and Delta Lake changed this equation by bringing warehouse comparable reliability to object storage. That shift is what makes the three patterns below possible. Pattern 1: Data Warehouse as the Enterprise Data Platform Best for: BI-heavy organizations, mature SQL teams Data flow: I have observed this pattern repeatedly in organizations that have a bigger BI reporting base, where canned reports, dashboards, and self-service BI dominate business requirements even today. When an engineer should choose this: Your organization has a purely BI-driven workload today, your team is SQL-native, and ML, streaming, or unstructured data requirements do not exist currently. Where it breaks: The core problem is not what this pattern does; it is what it cannot do when analytical needs evolve. And in my experience, they always do. Specific failure modes engineers hit at scale: Transformation pressure on one engine. Ingestion, transformation, and consumption all compete for warehouse compute. At high data volumes, this creates a workload management overhead or requires additional cost to separate out readers and writers.Vertical scaling becomes the only option. When the warehouse chokes under volume, the operations team scales up the cluster or splits reader/writer nodes by business unit. Both options add cost without adding architectural flexibility.Object Storage is a dump, not an asset. Raw files land in object storage but are never cataloged, versioned, or governed. Schema evolution and data lineage take significant engineering effort. No distributed compute path. ML model training, log analytics, clickstream analysis, and unstructured data have no home in this architecture. Re-architecture is expensive when needs change. Moving from this requires rearchitecting, often ending in the purchase of a costly SQL-driven SaaS solution because it was not built correctly. Summary: Pattern 1 works cleanly for what it is designed for. The problem is that the design is not futuristic and significantly limits the value that data can generate. Pattern 2: No Warehouse Scenario Best for: Engineering-led teams, ad hoc analytics, big data exploration Data flow: No data warehouse in the stack, instead a serverless query engine provides SQL abstraction directly over object storage. This pattern originated with Presto and Impala for big data analytics and worked well in that context. When an engineer should choose this: Your workload is dominated by ad hoc analytical queries, ML feature exploration, or log analytics. Where it breaks: This pattern struggles when it gets applied to BI workloads. The specific challenges engineers hit: Query performance is tricky. Without careful partitioning and file size optimization on the data, a dashboard refresh at 9am (cold start) may trigger a full scan of terabytes of data, causing unpredictable BI performance.Cost is unpredictable. These engines charge per TB scanned, not per result row. Forgetting a date filter on a large table scan can cost thousands of dollars in bills. Warehouse compute costs are predictable; object-storage query costs are not.Concurrency challenges. Warehouses have mature workload management techniques that can do queuing, prioritization, and resource allocation. Serverless engines yet to reach that maturity. So concurrent dashboard users face timeouts and failures, not graceful queuing.Result-set caching helps for identical repeated queries, but any variation in filter parameters triggers a full storage scan. Dynamic dashboard filters with user-specific slicers make this caching largely ineffective for BI use cases.Data lake schemas evolve frequently. Cloud catalogs like Glue or Unity Catalog track changes, but downstream BI dashboards break silently when columns shift. A warehouse enforces the contract that protects reports. Summary: Pattern 2 works perfectly for the scenarios that require distributed processing at scale. The problem is that the design is not built for low-latency use cases. Pattern 3: Purposeful Hybrid Best for: Mixed workloads, enterprise scale, cost-conscious organizations with a variety of workloads, adaptable, futuristic needs Data flow: This is the pattern in my experience that acts as a best of both worlds scenario, because it requires deliberate data segmentation rather than a rigid architectural decision. This has the ability to route each workload to the compute/storage that is purpose-built for it. ML models consuming ten years of data should not compete with operational dashboards needing sub-second response on last quarter's inventory. Object storage is cheap and scalable horizontally for everything, whereas a warehouse holds only what requires low-latency relational access. Implementation on the warehouse side: Instead of loading the full silver and gold layers into the warehouse, load only the date range business users need for low-latency reporting. In retail, inventory movement rarely needs more than a rolling year in the warehouse. In travel and hospitality, promotional rate performance reports typically span a few months. A schedule query copies the relevant date-partitioned data from the data lake into the warehouse, and purges expired ranges from the warehouse once they age out. The source data remains in object storage permanently. This single mechanism drastically reduces warehouse compute and storage costs while keeping BI response times fast. Handling historical range queries: The common objection is: what about reports that need five years of history? The right engineering question is not "how do we make that fast?" — it is "does that query need millisecond latency?" A five-year inventory trend analysis requested once a quarter may not need a millisecond response time. This is the exact use case for federated queries. Redshift Spectrum, Athena, and Synapse Serverless allow external tables to be defined over S3/ADLS data, queryable alongside physical warehouse tables with standard SQL joins. A retail analyst querying this week's inventory from Redshift joined against three years of history in S3 without moving a single byte SQL -- Example: federated query joining warehouse (hot) + lake (cold) SELECT w.sku, w.inventory_count, h.avg_inventory_12m FROM warehouse.inventory_current w JOIN external_schema.inventory_history h -- data lives in S3 ON w.sku = h.sku WHERE w.report_date = CURRENT_DATE Decision Framework The following table helps to decide which pattern is most suitable for their need Pattern 1Pattern 2Pattern 3 BI performance High Unpredictable High ML / unstructured None Native Native Cost at scale Expensive Unpredictable Optimized Governance Low Strong Strong Scalability Vertical Horizontal Horizontal Implementation complexity Low Medium High Future-readiness Low Medium High Team skill required SQL Spark/Distributed computing Both The mistake is not choosing the wrong pattern; it is applying one pattern to all workloads. Every large enterprise I have worked with has a mix of workload types that no single architecture serves equally well. situationrecommended pattern >70% BI and reporting workload Pattern 1 or 3 Heavy ML, log, or unstructured data Pattern 2 or 3 Petabyte scale with cost pressure Pattern 3 Small to mid-size, SQL-native team Pattern 1 Ad hoc analytics dominant Pattern 2

By Nabarun Bandyopadhyay
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch

Threat intelligence becomes operationally valuable when indicator data can be collected continuously, normalized into a consistent schema, and queried fast enough to support enrichment and detection workflows. Standardized exchange formats such as STIX and transport protocols such as TAXII exist specifically to make machine-readable cyber threat intelligence easier to distribute at scale, while preserving enough structure for downstream correlation and context. Operational Requirements That Shape Intelligence Pipelines A threat intelligence pipeline is best treated as data engineering with security-specific constraints: provenance must remain intact, updates and revocations must be representable, and “freshness” should be measurable rather than assumed. STIX is explicitly designed to model cyber threat intelligence using typed objects with attributes, and it supports building richer context by linking objects through relationships rather than shipping flat indicator lists. A practical pipeline design often separates raw ingestion from normalized storage. Raw ingestion preserves the original feed payload for auditability and reversibility, while normalized storage produces documents that are easy to match against telemetry. This split aligns with STIX’s modeling approach, where producers may publish Indicators expressed as STIX patterns and connect them to other objects through relationship constructs, enabling consumers to choose between lightweight atom extraction for matching and graph-style context for analysis. Pulling From TAXII and Other APIs Without Losing Provenance TAXII 2.1, published by OASIS Open, defines a RESTful API and related requirements for TAXII clients and servers to exchange cyber threat information in a scalable manner, with STIX 2.1 support described as mandatory to implement in the TAXII context. The IANA media type registration for application/taxii+json also documents that the older application/vnd.oasis.taxii+json name is a deprecated alias, which matters in real integrations because content negotiation and strict header validation vary by server implementation. TAXII 2.1 also formalized mechanics that directly affect pipeline correctness under load. The CTI documentation notes that TAXII 2.1 added limit and next URL parameters and updated content negotiation and media types, reflecting a move toward pagination patterns that can handle large or rapidly changing datasets more safely than item-based offset pagination. A Python pipeline can either implement paging logic directly or delegate it to a client library; the taxii2client project documents a TAXII 2.1 client API that uses application/taxii+json;version=2.1 for Accept handling and provides an as_pages helper for TAXII 2.1 endpoints that support pagination, including “Get Objects” and “Get Manifest.” Python def iter_taxii_objects(collection, cursor, page_size=2000): accept = "application/taxii+json;version=2.1" for page in as_pages(collection.get_objects, per_request=page_size, added_after=cursor, accept=accept): envelope = page if isinstance(page, dict) else page.json() for obj in envelope.get("objects", []): yield obj This pattern avoids embedding server-specific pagination tokens into pipeline logic while still enabling incremental collection reads. The cursor argument can be persisted as an ISO-8601 timestamp when the upstream provides a timestamp filter, a model commonly used by TAXII-feed vendors; for example, ESET documents STIX 2.1 feeds delivered via TAXII 2.1 collections and describes an added_after filter parameter for retrieving objects added after a specified timestamp, alongside retention constraints that make incremental pulls operationally necessary. Not all threat intelligence sources are TAXII-first. MISP Project documentation describes a REST-accessible STIX export capability and explicitly notes that STIX XML export can be slow and lead to timeouts with large events or collections, while STIX JSON avoids that regime, making JSON a more stable transport choice for high-volume automation. The same ecosystem provides a published OpenAPI specification and a dedicated converter library, misp-stix, which supports bidirectional conversion across STIX versions, including STIX 2.1, and includes features such as pattern parsing and indicator-observable fingerprinting, reducing the cost of maintaining bespoke mapping logic for every upstream source. Normalization Into ECS and STIX-Aware Semantics Normalization is where a pipeline either becomes queryable or becomes another archive. The Elastic Common Schema (ECS) threat field guidance explicitly frames threat.* as the mapping layer that normalizes threat intelligence indicators from many structures into consistent fields, and it links that normalization to detection and enrichment workflows such as indicator match rules. In particular, the guidance calls out normalizing indicators into threat.indicator.* so that disparate feeds can be queried consistently and used to build indicator matching logic without treating every provider as a special case. Atomic indicators benefit from being stored both as “typed value” and as vendor identifiers. ECS defines threat.indicator.type values aligned with cyber observable types and documents threat.indicator.id as a place to store indicator IDs, noting that a STIX 2.x indicator ID is a common approach and that the field can hold multiple values to represent the same indicator across systems. The practical implication is that a pipeline can preserve the upstream STIX identifier, attach a stable provider-local identifier when necessary, and still normalize the matchable indicator value into fields such as threat.indicator.ip or other threat.indicator.* subfields. Python def stix_confidence_to_nlmh(value): if value is None: return "Not Specified" v = int(value) if v == 0: return "None" if 1 <= v <= 29: return "Low" if 30 <= v <= 69: return "Medium" if 70 <= v <= 100: return "High" return "Not Specified" def extract_atomic_from_pattern(pattern): p = (pattern or "").strip() if "ipv4-addr:value" in p and "'" in p: return ("ipv4-addr", p.split("'")[1]) if "domain-name:value" in p and "'" in p: return ("domain-name", p.split("'")[1]) if "url:value" in p and "'" in p: return ("url", p.split("'")[1]) return (None, None) def stix_indicator_to_ecs(indicator_obj, provider, fetched_at_iso): itype, ivalue = extract_atomic_from_pattern(indicator_obj.get("pattern")) if not itype: return None doc = { "@timestamp": fetched_at_iso, "event": {"kind": "enrichment", "category": ["threat"], "type": ["indicator"]}, "threat": { "indicator": { "type": itype, "provider": provider, "name": indicator_obj.get("name") or ivalue, "description": indicator_obj.get("description"), "confidence": stix_confidence_to_nlmh(indicator_obj.get("confidence")), "reference": indicator_obj.get("id"), "id": [indicator_obj.get("id")], } }, "labels": {"feed": provider}, } if itype in {"ipv4-addr", "ipv6-addr"}: doc["threat"]["indicator"]["ip"] = ivalue return doc The extraction logic deliberately scopes itself to common “atomic” patterns to keep parsing deterministic and to minimize the risk of silently incorrect field derivation. This constraint matches the operational intent of ECS indicator guidance, which emphasizes consistent querying and reuse for indicator match rules after normalization, rather than attempting to fully interpret every possible composite STIX pattern in real time. Indexing Strategy in Elasticsearch That Avoids Accidental Cost Explosion Elasticsearch storage decisions are not purely operational preferences because they alter what update patterns are safe. Data streams consist of one or more hidden backing indices and require a matching index template; every document indexed into a data stream must include an @timestamp field mapped as a date-type (or date_nanos). Data streams are described as a good fit for most time-series use cases, while the documentation explicitly flags that frequent reuse of the same _id expecting last-write-wins can indicate a better fit for an index alias with a write index rather than a data stream. Threat intelligence pipelines often straddle that boundary: indicator state changes and revocations benefit from upsert semantics, while ingestion audits benefit from append-only history. Retention should be tied to query strategy. Elastic Security documentation warns that indicator match rules can consume significant resources and recommends limiting the indicator index query time range to the minimum necessary for coverage, with a default example query of the past 30 days. Even outside an alerting engine, a time-bounded indicator set tends to be operationally safer: it reduces scan cost, makes cache behavior more predictable, and avoids matching against long-expired infrastructure that is no longer relevant. When vendor retention is narrower, such as the 14-day retention window described for some TAXII feeds, the pipeline should persist that constraint as a policy and avoid relying on “full historical replay” as a recovery mechanism. Ingestion-Time Guardrails With Python, Ingest Pipelines, and Bulk Writes Ingest pipelines provide an explicit place to enforce normalization rules at ingest time. Elastic documentation describes ingest pipelines as a sequence of processors that run sequentially to transform data before it is indexed into a data stream or index, supporting operations such as removal, extraction, and enrichment. In addition, ingest processors can access ingest metadata under the _ingest key, and Elasticsearch notes that pipelines create _ingest.timestamp by default and that indexing ingest metadata requires explicitly setting it via a processor. JSON PUT /_ingest/pipeline/ti_normalize { "description": "Normalize threat intel indicators into ECS threat.indicator.*", "processors": [ { "set": { "field": "event.kind", "value": "enrichment" } }, { "set": { "field": "event.category", "value": ["threat"] } }, { "set": { "field": "event.type", "value": ["indicator"] } }, { "set": { "field": "event.ingested", "value": "{{{_ingest.timestamp}}" } }, { "fingerprint": { "fields": ["threat.indicator.provider", "threat.indicator.type", "threat.indicator.ip"], "target_field": "threat.indicator.fingerprint", "method": "SHA-256", "ignore_missing": true } } ] } Bulk ingestion should align with Elasticsearch’s wire format rules. The bulk API documentation describes NDJSON requirements, including that the final line must end with a newline character and that JSON actions and sources should not be pretty printed because newlines are literal delimiters. A Python producer can serialize documents into bulk batches, assign a deterministic _id derived from provider and atomic indicator value to make writes idempotent, and optionally route documents through the normalization pipeline configured above. Python def build_indicator_id(provider, itype, ivalue): return (provider + ":" + itype + ":" + ivalue).lower() def bulk_index_indicators(es_http, index_name, docs): lines = [] for d in docs: ti = d.get("threat", {}).get("indicator", {}) doc_id = build_indicator_id(ti.get("provider", "unknown"), ti.get("type", "unknown"), ti.get("ip", ti.get("name", "unknown"))) lines.append(encode_json({"index": {"_index": index_name, "_id": doc_id, "pipeline": "ti_normalize"})) lines.append(encode_json(d)) payload = "\n".join(lines) + "\n" return es_http.post("/_bulk", body=payload, headers={"Content-Type": "application/x-ndjson"}) The NDJSON newline termination is not optional, so building the payload in a way that always emits a trailing newline avoids a class of partial-ingest failures that are hard to diagnose under load. For enrichment use cases, ingest-time join behavior should be applied cautiously: Elastic warns that the enrich processor can impact ingest speed, recommends benchmarking, and explicitly states that it is not recommended for appending real-time data, instead working best with reference data that does not change frequently. This guidance aligns with threat intelligence practice: fast-changing indicators typically work better as a queried dataset, joined at search or detection time, rather than as an ingest-time enrichment applied to every event. Conclusion A threat intelligence pipeline built on Python, APIs, and Elasticsearch becomes reliable when it treats schemas, media types, and update semantics as core engineering concerns instead of integration details. STIX and TAXII provide standard object modeling and transport expectations, including content negotiation and pagination mechanics, while ECS provides a target schema that makes indicators consistently queryable and directly usable by matching workflows such as indicator match rules. High-quality implementations preserve provenance, normalize into threat.indicator.* with STIX-aligned confidence semantics, choose an indexing strategy that matches expected update patterns, and enforce ingestion guardrails through ingest pipelines, simulation, and NDJSON-correct bulk writes.

By Krishnaveni Musku
Optimizing Databricks Spark Pipelines Using Declarative Patterns
Optimizing Databricks Spark Pipelines Using Declarative Patterns

If you've ever inherited a Spark job that runs in 35 minutes and someone asks you to make it faster, you know the routine. You start by checking partition counts, then file sizes, then shuffle stages, then broadcast hints. You find a handwritten OPTIMIZE schedule from 2022, a Z-ORDER on the wrong column, and a cluster sized for last year's data volume. By the time you've made the job fast, you've absorbed three new things to maintain. The next person to inherit it will absorb four. This pattern — call it the hand-tuning treadmill — is what the declarative optimization story on Databricks is trying to break. It's not a single feature; it's a cluster of capabilities that collectively let teams describe what a table should look like and let the engine handle the physical optimizations. What follows is the practical view of those patterns: where they fit, what they replace, and how to migrate without a rewrite weekend. 1. The Hand-Tuning Treadmill: Why Imperative Optimization Doesn't Scale Before getting into the declarative side, it's worth being concrete about what "imperative Spark optimization" actually means in production. The shape is consistent across teams I've audited: Layout decisions frozen on day one. Somebody picks a partition column when the table is created. The data shape changes a year later. Nobody re-partitions because the migration is scary. Query plans drift toward full scans.Maintenance jobs that nobody owns. An OPTIMIZE / Z-ORDER / VACUUM script lives in a notebook scheduled at 3 AM. It runs on a cluster that's slightly mis-sized. When data volume grows, the job runs into the morning workload, and people complain about latency.Cluster sizing as a guess. Worker count is a heuristic from a senior engineer's memory of last year's spike. Half the time it's too big, half the time it's too small, and the cost discussion gets emotional.Hint-driven plans. Broadcast hints, repartition hints, coalesce (N) — sprinkled through pipelines to fix yesterday's problem, kept indefinitely because removing them feels risky. None of these are bugs. They're symptoms of the imperative model: the team owns the layout, the maintenance, the sizing, and the plan tuning. In small pipelines, ownership is fine. At scale, it becomes the bottleneck that the team can't outsource. 2. What "Declarative" Means in the Spark Optimization Context Declarative is a word that gets used in two different ways here, and it's worth pulling them apart. Within Lakeflow pipelines (formerly DLT), it means "describe the tables, not the steps" — the engine builds the DAG and runs it. But in the broader optimization story, declarative also means "describe the desired property of the table or workload, not the operations to maintain it": Layout: I want this table clustered by these columns; figure out when and how to re-cluster.Maintenance: I want this table optimized and vacuumed; figure out the schedule.Ingestion: I want all new files in this path picked up exactly once; figure out checkpointing and listing.Quality: These rows must satisfy these expectations; enforce them and report what gets dropped.Compute: I want this query fast and not wasteful; size and scale appropriately. Each one of those bullets corresponds to a piece of the declarative stack. Used together, they replace a remarkable amount of the boilerplate that has historically lived in Spark pipelines. The mental shift: You stop writing operations against the table and start writing properties of the table. The engine becomes the actor; you become the editor. 3. The Declarative Optimization Stack on Databricks The chart below maps each thing the team declares to the engine capability that handles it, ending at the physical Delta table. It's the picture I draw on whiteboards when teams ask, "What's the order to adopt these in?" Figure 1. The declarative optimization stack: each user-facing intent at the top maps to a continuous engine behavior, which keeps the underlying Delta tables well-clustered, compacted, and statistically up-to-date — without human intervention. Two things are worth highlighting in this picture. First, every box in the engine row is something that runs continuously, not on a cron — there is no daily "optimization window" anymore. Second, the bottom layer is identical to what you'd get from any well-tuned imperative pipeline: 256 MB Parquet files with current statistics. The declarative path doesn't change what good looks like; it changes who does the work to keep things looking good. 4. Layout: Liquid Clustering Replaces Hand-Maintained Z-ORDER Liquid Clustering is the change with the largest practical impact, because partition-key choices are where most lakehouse pipelines accumulate the most technical debt. The declarative version: you specify the columns the data is most often filtered or joined by, and the engine maintains a layout that supports those access patterns — incrementally, as new data arrives, without a full rewrite. When access patterns change, you change the cluster columns, and the engine re-clusters in the background. Defining Liquid-Clustered Tables SQL -- New table, clustered by the columns most commonly filtered on. -- No more PARTITIONED BY, no more guessing at partition cardinality. CREATE TABLE prod.gold.daily_totals ( account_id STRING, region STRING, ingest_date DATE, daily_total DECIMAL(18,2), txn_count BIGINT ) USING DELTA CLUSTER BY (region, ingest_date, account_id); -- Even better: let the engine pick the clustering columns by -- observing real query patterns over time. CREATE TABLE prod.gold.events_clustered USING DELTA CLUSTER BY AUTO AS SELECT * FROM prod.silver.events; Migrating an Existing Partitioned/Z-ORDER Table SQL -- Convert a legacy partitioned table to liquid clustering. -- Existing data files are not rewritten immediately; the engine -- rebalances incrementally on subsequent writes + maintenance. ALTER TABLE prod.silver.transactions CLUSTER BY (account_id, ingest_date); -- Force the first clustering pass for a freshly converted table OPTIMIZE prod.silver.transactions FULL; Why this matters: the recurring 2 AM Slack thread of "can we re-partition this table?" goes away. Layout becomes a property you change with one DDL statement, not a multi-week rewrite project. 5. Maintenance: Predictive Optimization Replaces Cron-Driven OPTIMIZE/VACUUM Predictive optimization is the part that retired the most legacy code in the pipelines I've migrated. Once enabled at the catalog or schema level, the engine monitors each table's read and write patterns and decides on its own when to compact files, re-cluster, vacuum, and refresh statistics. The big win isn't the operations themselves — the imperative pipeline could already run those — it's that the timing is observed-driven, not schedule-driven. Tables that get heavy ingestion get more frequent maintenance. Cold tables get left alone. SQL -- Turn it on at the catalog level once; new tables inherit. ALTER CATALOG prod SET PREDICTIVE OPTIMIZATION = ENABLED; -- Or at the schema level for a phased rollout ALTER SCHEMA prod.gold SET PREDICTIVE OPTIMIZATION = ENABLED; -- Inspect what the engine has been doing on a given table SELECT operation, operation_metrics.numFilesAdded AS files_added, operation_metrics.numFilesRemoved AS files_removed, operation_metrics.numOutputBytes AS output_bytes, timestamp FROM (DESCRIBE HISTORY prod.gold.daily_totals) WHERE userMetadata IS NULL -- engine-driven, not user AND operation IN ('OPTIMIZE', 'VACUUM') AND timestamp >= current_timestamp() - INTERVAL 7 DAYS ORDER BY timestamp DESC; What you should delete after enabling this: the nightly notebook that runs OPTIMIZE on every table in a schema, the VACUUM cron job, the ANALYZE TABLE wrapper, and the alerting that wakes someone up when those jobs run long. None of them are needed anymore, and leaving them on creates duplicate work that the engine and the cron will fight over. 6. Ingestion: Auto Loader Replaces Listing-Based File Detection Auto Loader is the declarative answer to the perennial "which files have we processed already?" problem. Instead of listing a directory, comparing it to a state file, and figuring out the new bits, you describe the source location and the format and let the engine maintain its own incremental state. It uses cloud-native event notifications (S3 events, ADLS notifications, or efficient directory listing as a fallback), and the checkpoint is just another piece of state the engine owns. Python from pyspark.sql.functions import current_timestamp # Streaming ingest from S3 with schema inference + evolution. # Replaces hand-maintained checkpointing, listing logic, and # whatever file-tracking table the team built two years ago. (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaLocation", "s3://acme-checkpoints/txns_schema") .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .load("s3://landing/txns/") .withColumn("_ingest_ts", current_timestamp()) .writeStream .format("delta") .option("checkpointLocation", "s3://acme-checkpoints/txns_writer") .trigger(availableNow=True) # batch-style; runs to completion .toTable("prod.bronze.txns")) Two notes from production. First, schemaEvolutionMode is the option that prevents the silent-data-loss class of bugs when partner schemas change; pick the policy explicitly rather than letting it default. Second, trigger(availableNow=True) gives you batch ergonomics on a streaming source — the job runs until it has consumed everything and exits, which is what most teams actually want for daily ingestion. 7. Transforms and Quality: Declarative Pipelines Replace Bare Spark + External DQ The final piece is the transformation layer. Lakeflow pipelines (the rebrand of Delta Live Tables) let you declare each table as a Python or SQL definition, and add expectations as a first-class concept. The engine derives the DAG from the dependencies and enforces the expectations on every write — the data quality framework, the lineage layer, and the orchestration glue collapse into a single artifact. Python import dlt from pyspark.sql.functions import sum as _sum, col @dlt.table( name="silver_txns", table_properties={ "delta.enableChangeDataFeed": "true", "delta.tuneFileSizesForRewrites": "true", }, cluster_by=["account_id", "ingest_date"], ) @dlt.expect_or_drop("non_null_amount", "amount IS NOT NULL") @dlt.expect_or_fail("valid_currency", "currency IN ('USD','EUR','GBP')") @dlt.expect("unique_txn", "txn_id IS NOT NULL") def silver_txns(): return (dlt.read_stream("bronze_txns") .dropDuplicates(["txn_id"])) @dlt.table(name="gold_daily_totals") def gold_daily_totals(): return (dlt.read("silver_txns") .groupBy("ingest_date", "account_id", "region") .agg(_sum("amount").alias("daily_total"))) The decorators do four things at once: define the table, declare its layout (cluster_by), declare its quality rules, and let the engine infer that gold_daily_totals depends on silver_txns from the dlt.read call. There is no DAG file. There is no separate Great Expectations suite. Lineage is generated for free in Unity Catalog, including column-level edges. If you want to query how the expectations have been performing — useful for SLO dashboards or alerting — the event log surfaces it directly: SQL -- Pass / fail / drop counts per expectation, last 24 hours SELECT flow_name, details:flow_progress.data_quality.expectations[0].name AS exp_name, details:flow_progress.data_quality.expectations[0].passed_records AS passed, details:flow_progress.data_quality.expectations[0].failed_records AS failed, details:flow_progress.data_quality.expectations[0].dropped_records AS dropped, timestamp FROM event_log("<pipeline-id>") WHERE event_type = 'flow_progress' AND timestamp >= current_timestamp() - INTERVAL 1 DAY ORDER BY timestamp DESC; 8. Putting It Together: Where to Start, What to Measure Adopting all of this at once is a recipe for pain. The order I've seen work, and a small set of metrics to verify the change is paying off: Step Adopt Retire Verify with 1 Predictive optimization at schema level Nightly OPTIMIZE / VACUUM jobs Reduction in maintenance-cluster cost 2 Liquid clustering on top 5 tables Static partitioning + Z-ORDER p95 query latency on the same workloads 3 Auto loader for 1-2 ingestion pipelines Custom file-tracking + listing logic End-to-end data freshness 4 Lakeflow pipelines for new pipelines only External DQ + DAG glue (for new work) Lines of pipeline code per table 5 Serverless compute for SQL warehouses + DLT Hand-sized job clusters Cost-per-query, scale-up time What you do not need to migrate: imperative pipelines that already work and aren't growing. Declarative patterns are about new work and high-pain hot spots, not a heroic rewrite of every notebook ever shipped. 9. Honest Limitations and Where Imperative Still Wins Three places where the declarative model still bites — worth knowing before you commit: Procedural logic still belongs in Jobs. If your pipeline is really a sequence of API calls with branching error handling, that's a Lakeflow Job (or external code), not a declarative table. Don't try to bend dlt around it.Predictive optimization needs observation time. On a table that's a week old, the engine hasn't seen enough patterns to make great decisions. For tables under heavy initial load, an explicit OPTIMIZE FULL after the first big ingest still helps.Cluster-by-column choice still matters. CLUSTER BY AUTO is great for stable workloads with predictable filters. For tables whose access pattern is genuinely heterogeneous across teams, an explicit cluster-by based on the dominant query is usually faster.Hint-driven escapes are still allowed. If a particular query benefits from a /*+ BROADCAST(t) */ hint and AQE isn't catching it, the hint is fine. Just keep them rare and document why. Conclusion The declarative optimization story isn't a single feature you toggle — it's a quiet shift in who owns the boring parts of a Spark pipeline. Layout, maintenance, ingestion bookkeeping, plan tuning, cluster sizing, data quality enforcement: every one of those was traditionally a thing the team owned and paid for in toil. The current Databricks stack lets you express each as an intent and let the engine handle the operations underneath. Adopt them in order, retire what they replace, and the optimization treadmill slows from a daily concern to a quarterly review. That's the actual win, and it's the reason the declarative paradigm has gone from a Lakeflow detail to the default mental model for new pipelines on Databricks.

By Seshendranath Balla Venkata
Event-Driven Pipelines With Apache Pulsar and Go
Event-Driven Pipelines With Apache Pulsar and Go

A Practical Walkthrough Most distributed systems eventually hit a wall with their messaging layer, whether it's Kafka's tight coupling between compute and storage, RabbitMQ's limited replay capabilities, or the operational overhead of managing multiple tools for queuing and streaming. Apache Pulsar was engineered to address these gaps from the ground up. In this article, we'll dissect a working Go-based demo that wires together a Pulsar producer, consumer, and Prometheus monitoring layer into a cohesive, observable messaging pipeline. The full source is on GitHub. Why Pulsar Deserves a Closer Look Pulsar's architecture makes a deliberate trade-off that most messaging systems avoid: it physically separates the broker tier (which handles routing, subscriptions, and protocol) from the storage tier (Apache BookKeeper, which handles persistence). This isn't just an implementation detail. It means you can independently autoscale message routing capacity without touching your storage cluster, and vice versa. Beyond the architecture, a few capabilities stand out for engineering teams: Multi-tenancy at the protocol level – tenants, namespaces, and topics form a three-level hierarchy, making it practical to run a single Pulsar cluster for multiple teams or services without namespace collisions.Four distinct subscription semantics – Exclusive, Shared, Failover, and Key_Shared give you precise control over how messages are distributed across consumer instances, something Kafka's consumer group model doesn't natively offer.Cursor-based message retention – Pulsar retains messages based on subscription cursors, not time-based log compaction. A consumer that falls behind doesn't lose messages; it simply catches up from its last acknowledged position.Native schema enforcement – The built-in schema registry validates message payloads at the broker level before they reach consumers, catching contract violations at the boundary rather than deep inside application logic. What the Demo Builds The project is structured as three independent Go binaries, each with a single responsibility: reStructuredText ├── producer/ │ ├── main.go # HTTP server → Pulsar publisher │ ├── go.mod │ └── go.sum ├── consumer/ │ └── main.go # Pulsar subscriber → message processor ├── monitor/ │ └── main.go # Pulsar producer + Prometheus metrics server ├── prometheus.yml # Scrape configuration └── README.md All three use `github.com/apache/pulsar-client-go/pulsar` — the official, Apache-maintained Go client. The client is not a thin wrapper; it implements the full Pulsar binary protocol, connection pooling, producer batching, and automatic Prometheus metric registration. Component 1: The Producer The producer exposes a single HTTP endpoint. An incoming HTTP request triggers a Pulsar publish operation, decoupling the caller from any direct knowledge of the messaging infrastructure. Go package main import ( "context" "fmt" "log" "net/http" "github.com/apache/pulsar-client-go/pulsar" ) var client pulsar.Client func main() { var err error client, err = pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { log.Fatal(err) } defer client.Close() http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) { msg := r.URL.Query().Get("msg") err := publishMessage(msg) if err != nil { w.Write([]byte("msg failed to published")) } else { w.Write([]byte("msg successfully published")) } }) if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatal(err) } } func publishMessage(msg string) error { producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "my-topic", }) if err != nil { log.Fatal(err) } _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte("Hello"), }) return err } A few architectural observations worth unpacking: The HTTP-to-Pulsar bridge pattern is deliberately pragmatic. Rather than requiring every upstream service to embed a Pulsar client, you expose a thin HTTP adapter. This is particularly valuable when integrating with systems that speak HTTP natively — CI/CD pipelines, third-party webhooks, or legacy services that can't easily adopt a new client library. `pulsar.NewClient` establishes a connection pool, not a single TCP connection. The client maintains persistent connections to the broker and handles reconnection, load balancing across broker nodes, and TLS negotiation transparently. Calling `client.Close()` via `defer` ensures all in-flight messages are flushed before the process exits. `producer.Send` with `context.Background()` submits the message to the producer's internal send queue. The Pulsar client batches outgoing messages by default (configurable via `BatchingMaxMessages` and `BatchingMaxPublishDelay`), which significantly improves throughput under load without any changes to application code. For production use, the producer instance should be created once at startup and reused across requests. Creating a new producer per request incurs connection overhead and bypasses the batching optimization entirely. Component 2: The Consumer The consumer subscribes to a topic and processes messages with explicit acknowledgment. The subscription type chosen here — `pulsar.Shared` — has meaningful implications for how the system scales. Go consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() for i := 0; i < 10; i++ { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message with Id: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } if err := consumer.Unsubscribe(); err != nil { log.Fatal(err) } Subscription Semantics in Depth Pulsar's subscription model is one of its most differentiating features. Here's how the four types behave at the broker level: Exclusive – The broker enforces that only one consumer holds the subscription at any time. A second consumer attempting to subscribe with the same name will receive an error. This guarantees strict message ordering but eliminates horizontal scaling.Shared – The broker distributes messages across all active consumers in round-robin order. Any number of consumers can join or leave the subscription dynamically. This is the right choice for stateless workloads where processing order doesn't matter and throughput is the priority.Failover – The broker designates one consumer as the active receiver. Others remain connected but idle, ready to take over if the active consumer disconnects. This preserves ordering while providing high availability — a pattern common in financial transaction processing.Key_Shared – The broker routes messages with the same key consistently to the same consumer instance. This enables stateful processing (e.g., per-user session aggregation) without external coordination, as long as the consumer count remains stable. Why Explicit Acknowledgment Matters `consumer.Ack(msg)` signals to the broker that the message has been durably processed and can be removed from the subscription's cursor. If the consumer process crashes between `Receive` and `Ack`, the broker will redeliver the message to another consumer in the subscription. This is the mechanism behind at-least-once delivery. For workloads that require exactly-once semantics, Pulsar supports transactional acknowledgment, where the `Ack` and any downstream writes are committed atomically. That's a more advanced topic, but the foundation is the same `Ack` call shown here. Component 3: The Monitor The monitor is architecturally the most interesting component. It runs two HTTP servers concurrently — one for the application endpoint, one for Prometheus metrics — and uses a Pulsar producer to generate observable traffic. Go package main import ( "context" "fmt" "log" "net/http" "strconv" "github.com/apache/pulsar-client-go/pulsar" ) func main() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6605", }) if err != nil { log.Fatal(err) } defer client.Close() prometheusPort := 2112 go func() { if err := http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil); err != nil { log.Fatal(err) } }() producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", }) if err != nil { log.Fatal(err) } defer producer.Close() ctx := context.Background() webPort := 8082 http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) { msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-world")), }) if err != nil { log.Fatal(err) } else { log.Printf("Published message: %v", msgId) fmt.Fprintf(w, "Message Published: %v", msgId) } }) if err := http.ListenAndServe(":"+strconv.Itoa(webPort), nil); err != nil { log.Fatal(err) } } How the Pulsar Client Registers Prometheus Metrics When `pulsar.NewClient` is called, the Go client automatically registers a set of Prometheus collectors with the default `prometheus.DefaultRegisterer`. No additional instrumentation code is required. The metrics are served at `/metrics` on whatever port you bind `http.DefaultServeMux` to port — in this case, port `2112`. The metrics exposed include: `pulsar_client_producers_opened` / `pulsar_client_producers_closed` – producer lifecycle counters`pulsar_client_consumers_opened` / `pulsar_client_consumers_closed` – consumer lifecycle counters`pulsar_client_messages_published_total` – cumulative publish count per topic`pulsar_client_publish_latency_seconds` – histogram of end-to-end publish latency`pulsar_client_bytes_published_total` – total bytes written to the broker Running the Prometheus metrics server in a goroutine while the main goroutine handles the application HTTP server is idiomatic Go concurrency. Both servers share the same `http.DefaultServeMux`, which is why the Prometheus `/metrics` handler (registered automatically by the client library) is accessible on the metrics port without any explicit route registration. Prometheus Scrape Configuration YAML scrape_configs: - job_name: pulsar-client-go-metrics scrape_interval: 10s static_configs: - targets: - localhost:2112 The `scrape_interval: 10s` is a reasonable starting point for development. In production, you would typically align this with your alerting resolution requirements — a 30-second interval is common for dashboards, while 10 seconds or less is appropriate for latency-sensitive alerting rules. With these metrics flowing into Prometheus, you can build Grafana panels that surface producer throughput, consumer lag, and publish latency percentiles with three signals that matter most when diagnosing messaging pipeline issues. Running the Full Pipeline Prerequisites Apache Pulsar standalone Go 1.18+Prometheus (optional) Startup Sequence 1. Launch Pulsar in standalone mode: Shell bin/pulsar standalone 2. Start the producer service: Shell cd producer && go run main.go 3. Start the consumer service: Shell cd consumer && go run main.go 4. Start the monitor service: Shell cd monitor && go run main.go 5. Start Prometheus: Shell prometheus --config.file=prometheus.yml 6. Publish a message via the producer endpoint: Shell curl "http://localhost:8080/publish?msg=test_pulsar_message_publish_event" # msg successfully published 7. Trigger the monitor producer: Shell curl http://localhost:8082/produce # Message Published: (messageId) 8. Inspect raw Prometheus metrics: Shell curl http://localhost:2112/metrics | grep pulsar Engineering Takeaways Decouple publish triggers from client library dependencies. The HTTP-to-Pulsar adapter pattern used in the producer is not just a demo convenience. It is a legitimate architectural boundary. Services that need to emit events don't need to know anything about Pulsar's protocol, topic naming, or client configuration. They make an HTTP call; the adapter handles the rest.Match subscription type to processing semantics, not just throughput. A common mistake is defaulting to `Shared` for everything because it scales horizontally. If your processing logic is stateful, for example, aggregating events per user session. `Key_Shared` gives you the same horizontal scalability while preserving per-key ordering without any application-level coordination.Treat the acknowledgment boundary as your consistency boundary. Everything between `Receive` and `Ack` is your processing window. Any side effects (database writes, downstream API calls, cache updates) that happen in this window must be idempotent, because Pulsar will redeliver the message if the consumer fails before acknowledging. Design your processing logic around this constraint from the start, not as an afterthought.Zero-cost observability is a genuine advantage. The fact that `pulsar-client-go` registers Prometheus metrics automatically means you get throughput, latency, and connection health data from the moment your application starts, without writing a single line of instrumentation code. This is a meaningful operational advantage over client libraries that require manual metric registration. Extending the Demo The current implementation is intentionally minimal. Here are technically meaningful extensions worth exploring: Schema enforcement – Replace raw `[]byte` payloads with Pulsar's schema-aware producer/consumer API. Using `pulsar.NewAvroSchema` or `pulsar.NewJSONSchema` moves payload validation to the broker, preventing malformed messages from ever reaching consumers.Dead-letter topic routing – Configure `DeadLetterPolicy` on the consumer to automatically route messages that exceed a maximum redelivery count to a separate topic. This prevents poison-pill messages from blocking the subscription indefinitely.Producer batching tuning – Set `BatchingMaxMessages`, `BatchingMaxSize`, and `BatchingMaxPublishDelay` on `ProducerOptions` to optimize the throughput/latency trade-off for your specific workload profile.Graceful shutdown – Add `os/signal` handling to flush in-flight messages and close the producer cleanly before the process exits. The current `defer client.Close()` handles the happy path but won't fire on `SIGKILL`.Kubernetes-native deployment – Package each component as a container and deploy using the official Pulsar Helm chart. The producer and monitor can be exposed as Kubernetes Services; the consumer can run as a Deployment with HPA scaling based on the `pulsar_client_messages_published_total` metric exported to a custom metrics adapter. Conclusion The `apache-pulsar` project demonstrates that building a production-grade messaging pipeline with Apache Pulsar and Go doesn't require much code. It requires understanding the right abstractions. The producer-consumer-monitor triad covers the three concerns that matter in any event-driven system: getting data in, getting data out, and knowing what's happening in between. Pulsar's architecture, decoupled storage, flexible subscription semantics, and built-in observability make it a strong candidate for teams that have outgrown simpler messaging systems and need more precise control over delivery guarantees, scaling behavior, and operational visibility. Source Code https://github.com/shivik/apache-pulsar-demo

By Shivi Kashyap
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka

After implementing contract-first integration across three different microservices architectures, I've learned that the biggest bottleneck in distributed systems isn't technical; it's coordination between teams. When Team A waits for Team B to finish their API before starting integration work, you're throwing away weeks of productivity. Contract-first development flips this model. By defining your integration contracts upfront (OpenAPI specs, Avro schemas, database migrations), you enable teams to work in parallel, catch breaking changes early through CI validation, and treat contracts as the single source of truth. This isn't theoretical; this is how Netflix, Uber, and Amazon scale their engineering organizations. In this article, I'll show you production-ready contract-first patterns using Java 21, Spring Boot 3, OpenAPI 3, Apache Kafka with Avro, and Flyway migrations. You'll see real code from a working system that handles the three critical integration boundaries: REST APIs, event-driven messaging, and database schemas. The Problem: Why Traditional Integration Fails at Scale When systems integrate without contracts, you hit three major problems: 1. Serial development bottlenecks: Team A builds an API endpoint. Team B waits. Team B builds a consumer. Team A discovers the payload doesn't match what Team B expected. Both teams spend days debugging mismatched assumptions. 2. Late discovery of breaking changes: You deploy a service update that changes a response field from customerId to customer_id. Your API consumers break in production. No tests caught it because there was no contract to validate against. 3. Documentation drift: The Swagger docs say the endpoint returns a 201. The actual code returns a 200. The integration tests expect a 404. Nobody knows which one is right because there's no single source of truth. Contract-first development solves all three by making the contract the authoritative specification that generates code, mocks, tests, and documentation. What Contract-First Actually Means Contract-first means you define the integration boundary first (the contract) and then write code that conforms to it. The contract is not an afterthought or generated documentation. It's the design specification. A complete contract includes: Operations: Endpoints (REST), topics (Kafka), or tables (database)Data shapes: Request/response DTOs, event schemas, column definitionsValidation rules: Required fields, constraints, data typesError model: HTTP status codes, error payloads, dead-letter queuesNon-functional rules: Idempotency, retries, compatibility policies, SLAs Here's the mental model: Agree on the contract → generate tools → build independently → let CI enforce alignment. Contract Type 1: REST API Contracts With OpenAPI OpenAPI 3 specs are the gold standard for REST API contracts. You define endpoints, request/response schemas, validation rules, and error responses in YAML. Then you generate server stubs, client SDKs, mocks, and documentation from that single source. OpenAPI Contract Example Here's a production OpenAPI contract for an order management API: YAML openapi: 3.2.0 info: title: Orders API version: 1.0.0 description: Contract-first REST API for order management paths: /v1/orders: post: operationId: createOrder summary: Create a new order requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/CreateOrderRequest' responses: '201': description: Order created successfully content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '400': description: Validation error content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' '409': description: Idempotency conflict content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' /v1/orders/{orderId}: get: operationId: getOrder parameters: - name: orderId in: path required: true schema: type: string responses: '200': description: Order found content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '404': description: Order not found components: schemas: CreateOrderRequest: type: object required: [customerId, items] properties: customerId: type: string example: CUST-123 idempotencyKey: type: string description: Optional key for safe retries items: type: array minItems: 1 items: $ref: '#/components/schemas/OrderItem' OrderItem: type: object required: [sku, quantity] properties: sku: type: string example: SKU-001 quantity: type: integer minimum: 1 OrderResponse: type: object required: [orderId, customerId, status, items, timestamp] properties: orderId: type: string customerId: type: string status: type: string enum: [CREATED, REJECTED] items: type: array items: $ref: '#/components/schemas/OrderItem' timestamp: type: string format: date-time ErrorResponse: type: object required: [code, message, traceId, timestamp] properties: code: type: string enum: [VALIDATION_ERROR, NOT_FOUND, CONFLICT, INTERNAL_ERROR] message: type: string traceId: type: string timestamp: type: string format: date-time Implementing the Provider Side (Spring Boot) The contract drives the implementation. Your Spring Boot controller implements what the contract specifies: Java @RestController @RequestMapping("/v1/orders") @RequiredArgsConstructor @Slf4j public class OrderController { private final OrderService orderService; /** * POST /v1/orders * Contract: contracts/openapi/orders-api.v1.yaml */ @PostMapping public ResponseEntity<OrderResponse> createOrder( @Valid @RequestBody CreateOrderRequest request) { log.info("Creating order for customer: {}", request.customerId()); OrderResponse response = orderService.createOrder(request); return ResponseEntity .status(HttpStatus.CREATED) .body(response); } /** * GET /v1/orders/{orderId} */ @GetMapping("/{orderId}") public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) { OrderResponse response = orderService.getOrder(orderId) .orElseThrow(() -> new ResourceNotFoundException("Order not found: " + orderId)); return ResponseEntity.ok(response); } } Critical implementation details: DTOs match contract schemas exactly: CreateOrderRequest and OrderResponse are Java records generated from or validated against the OpenAPI specHTTP status codes match contract: 201 for creation, 404 for not found, 409 for idempotency conflictsValidation is enforced: @Valid annotation ensures request validation matches OpenAPI constraintsError responses are standardized: All errors return ErrorResponse with consistent structure Consumer Parallel Development Here's where contract-first shines. While your team implements the provider, the consumer team can: Generate a Java client from the OpenAPI spec using openapi-generatorRun a mock server that returns valid responses based on the contractWrite integration tests against the mockSwitch to the real service when it's ready (no code changes needed) The consumer doesn't wait for you to finish. They develop in parallel. Contract Type 2: Event Contracts With Kafka and Avro Event-driven systems need two contract layers: topic semantics (human-readable) and schema definitions (machine-validated). Topic Semantics Contract Document the operational contract for each topic: Markdown ## Topic: orders.order-created.v1 - **Purpose**: Emitted when an order is created successfully - **Key**: orderId (partition affinity per order) - **Delivery**: At-least-once (consumers must be idempotent) - **Consumer requirement**: Deduplicate by eventId - **Retry policy**: Consumer retries transient errors - **DLQ**: orders.order-created.v1.dlq for poison messages - **Compatibility**: Backward compatible schema evolution required Avro Schema Contract The Avro schema is your machine-validated contract: Java { "type": "record", "name": "OrderCreated", "namespace": "com.acme.events", "doc": "Event emitted when an order is successfully created", "fields": [ { "name": "eventId", "type": "string", "doc": "Unique event ID for idempotent processing" }, { "name": "occurredAt", "type": "string", "doc": "ISO 8601 timestamp" }, { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "source", "type": ["null", "string"], "default": null, "doc": "Order source (WEB, MOBILE, API). Nullable for backward compatibility." }, { "name": "items", "type": { "type": "array", "items": { "type": "record", "name": "OrderItem", "fields": [ {"name": "sku", "type": "string"}, {"name": "quantity", "type": "int"} ] } } } ] } Key pattern: The source field is nullable with a default value. This supports backward-compatible evolution; old consumers can read new events, and new consumers can read old events. Kafka Producer Implementation Java @Component @RequiredArgsConstructor @Slf4j public class OrderEventPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; public void publishOrderCreated(OrderCreated event) { String key = event.getOrderId(); log.debug("Publishing OrderCreated: orderId={}, eventId={}", key, event.getEventId()); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("orders.order-created.v1", key, event); future.whenComplete((result, ex) -> { if (ex == null) { log.info("Published OrderCreated: orderId={}, partition={}, offset={}", key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { log.error("Failed to publish OrderCreated: orderId={}", key, ex); } }); } } Production concerns addressed: Key-based partitioning: Using orderId as the key ensures all events for the same order go to the same partition, maintaining orderingAsync publishing with callbacks: Non-blocking publish with explicit success/failure handlingStructured logging: Captures partition and offset for troubleshooting Kafka Consumer With Idempotency At-least-once delivery means duplicates are possible. Consumers must deduplicate: Java @KafkaListener(topics = "orders.order-created.v1", groupId = "billing-service") public void onOrderCreated(OrderCreated event) { // Check if already processed if (processedEventsRepository.existsByEventId(event.getEventId())) { log.debug("Skipping duplicate event: {}", event.getEventId()); return; } // Process event billingService.createInvoice( event.getOrderId(), event.getCustomerId(), event.getItems() ); // Mark as processed processedEventsRepository.save( new ProcessedEvent(event.getEventId(), Instant.now()) ); } Idempotency pattern: Check eventId before processing, store it after processing. If the same event arrives twice, the second one is ignored. Architecture Diagram: Contract-First Flow After implementing contract-first integration across three different microservices architectures, I've learned that the biggest bottleneck in distributed systems isn't technical; it's coordination between teams. When Team A waits for Team B to finish their API before starting integration work, you're throwing away weeks of productivity. Contract-first development flips this model. By defining your integration contracts upfront (OpenAPI specs, Avro schemas, database migrations), you enable teams to work in parallel, catch breaking changes early through CI validation, and treat contracts as the single source of truth. This isn't theoretical; this is how Netflix, Uber, and Amazon scale their engineering organizations. In this article, I'll show you production-ready contract-first patterns using Java 21, Spring Boot 3, OpenAPI 3, Apache Kafka with Avro, and Flyway migrations. You'll see real code from a working system that handles the three critical integration boundaries: REST APIs, event-driven messaging, and database schemas. The Problem: Why Traditional Integration Fails at Scale When systems integrate without contracts, you hit three major problems: 1. Serial development bottlenecks: Team A builds an API endpoint. Team B waits. Team B builds a consumer. Team A discovers the payload doesn't match what Team B expected. Both teams spend days debugging mismatched assumptions. 2. Late discovery of breaking changes: You deploy a service update that changes a response field from customerId to customer_id. Your API consumers break in production. No tests caught it because there was no contract to validate against. 3. Documentation drift: The Swagger docs say the endpoint returns a 201. The actual code returns a 200. The integration tests expect a 404. Nobody knows which one is right because there's no single source of truth. Contract-first development solves all three by making the contract the authoritative specification that generates code, mocks, tests, and documentation. What Contract-First Actually Means Contract-first means you define the integration boundary first (the contract) and then write code that conforms to it. The contract is not an afterthought or generated documentation. It's the design specification. A complete contract includes: Operations: Endpoints (REST), topics (Kafka), or tables (database)Data shapes: Request/response DTOs, event schemas, column definitionsValidation rules: Required fields, constraints, data typesError model: HTTP status codes, error payloads, dead-letter queuesNon-functional rules: Idempotency, retries, compatibility policies, SLAs Here's the mental model: Agree on the contract → generate tools → build independently → let CI enforce alignment. Contract Type 1: REST API Contracts With OpenAPI OpenAPI 3 specs are the gold standard for REST API contracts. You define endpoints, request/response schemas, validation rules, and error responses in YAML. Then you generate server stubs, client SDKs, mocks, and documentation from that single source. OpenAPI Contract Example Here's a production OpenAPI contract for an order management API: YAML openapi: 3.2.0 info: title: Orders API version: 1.0.0 description: Contract-first REST API for order management paths: /v1/orders: post: operationId: createOrder summary: Create a new order requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/CreateOrderRequest' responses: '201': description: Order created successfully content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '400': description: Validation error content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' '409': description: Idempotency conflict content: application/json: schema: $ref: '#/components/schemas/ErrorResponse' /v1/orders/{orderId}: get: operationId: getOrder parameters: - name: orderId in: path required: true schema: type: string responses: '200': description: Order found content: application/json: schema: $ref: '#/components/schemas/OrderResponse' '404': description: Order not found components: schemas: CreateOrderRequest: type: object required: [customerId, items] properties: customerId: type: string example: CUST-123 idempotencyKey: type: string description: Optional key for safe retries items: type: array minItems: 1 items: $ref: '#/components/schemas/OrderItem' OrderItem: type: object required: [sku, quantity] properties: sku: type: string example: SKU-001 quantity: type: integer minimum: 1 OrderResponse: type: object required: [orderId, customerId, status, items, timestamp] properties: orderId: type: string customerId: type: string status: type: string enum: [CREATED, REJECTED] items: type: array items: $ref: '#/components/schemas/OrderItem' timestamp: type: string format: date-time ErrorResponse: type: object required: [code, message, traceId, timestamp] properties: code: type: string enum: [VALIDATION_ERROR, NOT_FOUND, CONFLICT, INTERNAL_ERROR] message: type: string traceId: type: string timestamp: type: string format: date-time Implementing the Provider Side (Spring Boot) The contract drives the implementation. Your Spring Boot controller implements what the contract specifies: Java @RestController @RequestMapping("/v1/orders") @RequiredArgsConstructor @Slf4j public class OrderController { private final OrderService orderService; /** * POST /v1/orders * Contract: contracts/openapi/orders-api.v1.yaml */ @PostMapping public ResponseEntity<OrderResponse> createOrder( @Valid @RequestBody CreateOrderRequest request) { log.info("Creating order for customer: {}", request.customerId()); OrderResponse response = orderService.createOrder(request); return ResponseEntity .status(HttpStatus.CREATED) .body(response); } /** * GET /v1/orders/{orderId} */ @GetMapping("/{orderId}") public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) { OrderResponse response = orderService.getOrder(orderId) .orElseThrow(() -> new ResourceNotFoundException("Order not found: " + orderId)); return ResponseEntity.ok(response); } } Critical implementation details: DTOs match contract schemas exactly: CreateOrderRequest and OrderResponse are Java records generated from or validated against the OpenAPI specHTTP status codes match contract: 201 for creation, 404 for not found, 409 for idempotency conflictsValidation is enforced: @Valid annotation ensures request validation matches OpenAPI constraintsError responses are standardized: All errors return ErrorResponse with consistent structure Consumer Parallel Development Here's where contract-first shines. While your team implements the provider, the consumer team can: Generate a Java client from the OpenAPI spec using openapi-generatorRun a mock server that returns valid responses based on the contractWrite integration tests against the mockSwitch to the real service when it's ready (no code changes needed) The consumer doesn't wait for you to finish. They develop in parallel. Contract Type 2: Event Contracts With Kafka and Avro Event-driven systems need two contract layers: topic semantics (human-readable) and schema definitions (machine-validated). Topic Semantics Contract Document the operational contract for each topic: Markdown ## Topic: orders.order-created.v1 - **Purpose**: Emitted when an order is created successfully - **Key**: orderId (partition affinity per order) - **Delivery**: At-least-once (consumers must be idempotent) - **Consumer requirement**: Deduplicate by eventId - **Retry policy**: Consumer retries transient errors - **DLQ**: orders.order-created.v1.dlq for poison messages - **Compatibility**: Backward compatible schema evolution required Avro Schema Contract The Avro schema is your machine-validated contract: JSON { "type": "record", "name": "OrderCreated", "namespace": "com.acme.events", "doc": "Event emitted when an order is successfully created", "fields": [ { "name": "eventId", "type": "string", "doc": "Unique event ID for idempotent processing" }, { "name": "occurredAt", "type": "string", "doc": "ISO 8601 timestamp" }, { "name": "orderId", "type": "string" }, { "name": "customerId", "type": "string" }, { "name": "source", "type": ["null", "string"], "default": null, "doc": "Order source (WEB, MOBILE, API). Nullable for backward compatibility." }, { "name": "items", "type": { "type": "array", "items": { "type": "record", "name": "OrderItem", "fields": [ {"name": "sku", "type": "string"}, {"name": "quantity", "type": "int"} ] } } } ] } Key pattern: The source field is nullable with a default value. This supports backward-compatible evolution; old consumers can read new events, and new consumers can read old events. Kafka Producer Implementation Java @Component @RequiredArgsConstructor @Slf4j public class OrderEventPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; public void publishOrderCreated(OrderCreated event) { String key = event.getOrderId(); log.debug("Publishing OrderCreated: orderId={}, eventId={}", key, event.getEventId()); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("orders.order-created.v1", key, event); future.whenComplete((result, ex) -> { if (ex == null) { log.info("Published OrderCreated: orderId={}, partition={}, offset={}", key, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } else { log.error("Failed to publish OrderCreated: orderId={}", key, ex); } }); } } Architecture Diagram: Contract-First Flow Figure 1: Contract-first development flow showing parallel team development Note: Solid arrows represent generation or implementation flow. Dashed arrows represent validation relationships. Contract Type 3: Database Contracts With Flyway Contract Type 3: Database Contracts With Flyway Database schemas are contracts, too. Flyway migrations let you version and evolve schemas with the same contract-first approach. Base Schema Migration File: V1__create_orders.sql SQL CREATE TABLE orders ( id VARCHAR(32) PRIMARY KEY, customer_id VARCHAR(32) NOT NULL, status VARCHAR(16) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT now() ); CREATE TABLE order_items ( order_id VARCHAR(32) NOT NULL REFERENCES orders(id), sku VARCHAR(64) NOT NULL, quantity INT NOT NULL CHECK (quantity > 0), PRIMARY KEY (order_id, sku) ); CREATE INDEX idx_orders_customer_id ON orders(customer_id); Schema Evolution: Expand/Migrate/Contract When you need to add a field without breaking old code, use the expand/migrate/contract pattern: Expand (V2__add_order_source.sql): SQL ALTER TABLE orders ADD COLUMN source VARCHAR(32); Migrate (application code): SQL -- Backfill for existing rows UPDATE orders SET source = 'UNKNOWN' WHERE source IS NULL; Contract (V3__enforce_source_not_null.sql): SQL -- After all systems produce source ALTER TABLE orders ALTER COLUMN source SET NOT NULL; This three-phase approach lets you deploy schema changes without downtime. CI/CD Enforcement: Making Contracts Real Contracts only work if they're enforced. Here are the CI gates that prevent breaking changes: REST API: OpenAPI Breaking Change Detection Run openapi-diff in CI to catch breaking changes: YAML # .github/workflows/api-contract-check.yml - name: Check for API breaking changes run: | npx openapi-diff \ main:contracts/openapi/orders-api.v1.yaml \ HEAD:contracts/openapi/orders-api.v1.yaml \ --fail-on-breaking This fails the build if you: Remove required fieldsChange field typesRemove endpointsChange HTTP status codes Kafka: Schema Registry Compatibility Check Configure Schema Registry to enforce backward compatibility: YAML # Schema Registry configuration confluent.schema.registry.url=http://schema-registry:8081 spring.kafka.properties.schema.registry.url=http://schema-registry:8081 # Enforce backward compatibility spring.kafka.producer.properties.auto.register.schemas=true spring.kafka.producer.properties.use.latest.version=true When you try to publish an incompatible schema, the producer fails at startup, before reaching production. Database: Flyway Validation Flyway validates migrations on startup: Properties files spring.flyway.validate-on-migrate=true spring.flyway.baseline-on-migrate=false If someone manually modified the database schema, Flyway detects the mismatch and fails the deployment. Integration Flow Diagram Figure 2: End-to-end flow showing REST → DB → Kafka integration with contracts enforced at each boundary Real-World Benefits: Production Implementation Experience After implementing contract-first across multiple services (orders, billing, inventory), teams consistently observe significant improvements: Integration quality: Substantial reduction in integration bugs reaching productionMost remaining bugs are business logic issues, not contract mismatchesClear contracts eliminate ambiguity in integration expectations Development velocity: Integration cycles measured in weeks rather than monthsConsumer teams start integration work immediately instead of waiting for provider completionMock servers enable realistic integration testing without coordination delays Operational reliability: CI-enforced contract validation prevents breaking changes from reaching productionBreaking changes caught during PR review, not after deploymentAutomated validation ensures compatibility before merge Documentation accuracy: Documentation generated from OpenAPI specs stays synchronized with implementation by designSwagger UI always reflects actual API behavior as both derive from the same contractNo manual documentation maintenance required Common Pitfalls and How to Avoid Them Pitfall 1: Treating Contracts as Documentation Wrong approach: Write code first, generate OpenAPI from annotations later.Right approach: Write OpenAPI first, generate server stubs, or validate implementation against it. Pitfall 2: Skipping CI Validation Wrong approach: Trust developers to manually check compatibility.Right approach: Automate breaking change detection in CI. Make it a required check. Pitfall 3: No Schema Evolution Strategy Wrong approach: Add fields without defaults, breaking old consumers.Right approach: All new fields must be optional or have defaults. Test with multiple schema versions. Pitfall 4: Ignoring Idempotency Wrong approach: Assume exactly-once delivery in Kafka.Right approach: Design consumers to deduplicate by eventId. Assume at-least-once. Pitfall 5: Coupling Contracts to Implementation Wrong approach: Expose internal database schema directly in API contracts.Right approach: Design contracts for consumers, not internal convenience. Use DTOs that map between the external contract and the internal model. When Contract-First Makes Sense Contract-first isn't always the answer. Use it when: ✅ Multiple teams integrate: The coordination cost justifies the upfront contract design effort✅ Public APIs or partner integrations: External consumers need stability and clear documentation✅ Microservices architecture: Services must evolve independently without breaking dependents✅ High change frequency: CI validation catches breaking changes early when changes are frequent Skip contract-first when: ❌ Single small team: Coordination overhead is low, so formal contracts add friction❌ Prototyping: You're exploring the problem space and expect major pivots❌ Internal tools with one consumer: The provider and consumer are maintained by the same person Key Takeaways Contracts enable parallel development: Provider and consumer teams work simultaneously instead of seriallyCI validation prevents breaking changes: Automate OpenAPI diffs, schema compatibility checks, and migration validationIdempotency is not optional: At-least-once delivery in Kafka requires consumers to deduplicate by event IDSchema evolution requires backward compatibility: Use nullable fields with defaults to support gradual rolloutsContracts are design specifications, not afterthoughts: Define contracts first, generate code from them What's Next? If you're implementing contract-first integration: Start with one integration, pick your most painful cross-team dependencyWrite the OpenAPI spec or Avro schema first, before any implementationSet up CI validation for that contract (openapi-diff or Schema Registry)Measure reduction in integration bugs and coordination overheadExpand to other integrations once you've proven the pattern Contract-first isn't a silver bullet, but it transforms integration from a coordination nightmare into a governance problem that CI can solve. When you're coordinating three teams across two time zones, that shift makes the difference between shipping in weeks versus months. Full source code: github.com/wallaceespindola/contract-first-integrations Related reading: OpenAPI 3.0 Specification: spec.openapis.orgConfluent Schema Registry: docs.confluent.io/platform/current/schema-registryFlyway Documentation: flywaydb.org/documentationMastering Contract-First API Development: moesif.com/blogResearch on Microservices Issues: arxiv.org Need more tech insights? Check out my GitHub, LinkedIn, and Speaker Deck. Happy coding!

By Wallace Espindola
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions

When AWS announced Lambda Durable Functions at re: Invent 2025, my first reaction was, "Okay, but how is this different from Step Functions?" I have been building serverless workflows on AWS for a while now, and Step Functions has always been my go-to service for orchestrating multi-step pipelines. So naturally, I wanted to put this new capability to the test. I decided to build a simple document processing workflow, an ETL pipeline with human-in-the-loop approval using both Durable Functions and Step Functions, then run 1,000 actual document processing workflows through each system. What I found surprised me. Not just the cost difference (79% cheaper with Durable Functions), but the trade-offs that nobody is really talking about yet. In this tutorial, I will walk you through building a zero-cost approval workflow using Lambda Durable Functions with Python. Along the way, I will share the actual cost numbers and the lessons that would have saved me a few hours of debugging. The Problem: Approval Workflows Are Expensive If you have ever built a document processing system that requires human approval, you know the pain. Someone uploads a file, your system processes it, and then... it sits there. Waiting for a human to review and approve it. That wait can be 5 minutes, 20 minutes, or even hours. Traditional approaches to handling this waiting are: Polling: Your code keeps checking every 30 seconds — "Is it approved yet? How about now?" making those calls the entire time.Always-on server: An EC2 instance or ECS container sits idle, costing you money 24/7, just to catch that one approval event.External state management: You build a custom solution with DynamoDB, SQS, and Lambda triggers — works fine, but it requires you to maintain a state machine you built yourself. What if your workflow could just... pause? No compute charges. No polling. Just pause, wait for the human to do their thing, and resume exactly where it left off. That is exactly what Lambda Durable Functions enables with the wait_for_callback pattern. What We Are Building Here is the workflow we will implement: Extract data → Transform data → Load data → Wait for approval (≈20 min) → Finalize & archive A CSV file gets uploaded to an S3 bucket under the uploads/ prefix. Our durable function picks it up, runs it through three ETL steps (extract, transform, load), then pauses execution and waits for a human to approve the processed data through a shared approval API. Once approved, the function resumes, finalizes the job, and archives the file. The key part? During that 20-minute (or 2-hour, or 2-day) approval wait, you pay absolutely nothing for compute. Architecture Overview The project uses three separate SAM stacks: Markdown shared-resources/ # Approval API, DynamoDB, SNS (shared by both systems) durable-functions/ # Lambda Durable Functions ETL pipeline step-functions/ # Step Functions ETL pipeline (for comparison) The shared approval handler serves for both workflow types using a single API. When a job comes in for approval, it checks the workflowType field, and if it is durable-functions, it calls send_durable_execution_callback_success.If step-functions, it calls send_task_success. Same API endpoint, different callback mechanisms under the hood. Prerequisites Before we begin, make sure you have the following: AWS SAM CLI (latest version recommended)Python 3.14 runtime AWS account with Lambda, DynamoDB, S3, SNS, and API Gateway accessDocker for local Lambda testing Check your SAM CLI version: Markdown sam --version Step 1: Deploy Shared Resources First Before the ETL pipeline, we need the shared infrastructure — the approval API, DynamoDB table for pending approvals, and SNS topic for notifications. Here is the shared-resources/ SAM template: YAML # shared-resources/template.yaml AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Shared resources for ETL approval workflow Parameters: ApproverEmail: Type: String Description: Email address to receive approval notifications Default: [email protected] Resources: PendingApprovalsTable: Type: AWS::DynamoDB::Table Properties: TableName: etl-pending-approvals BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: jobId AttributeType: S KeySchema: - AttributeName: jobId KeyType: HASH TimeToLiveSpecification: AttributeName: ttl Enabled: true ApprovalNotificationTopic: Type: AWS::SNS::Topic Properties: TopicName: etl-approval-notifications Subscription: - Endpoint: !Ref ApproverEmail Protocol: email ApprovalApi: Type: AWS::Serverless::Api Properties: Name: ETL-Approval-API StageName: prod ApprovalHandlerFunction: Type: AWS::Serverless::Function Properties: FunctionName: ETL-Approval-Handler CodeUri: ./src Handler: approval_handler.handler Runtime: python3.14 MemorySize: 256 Timeout: 30 Environment: Variables: APPROVALS_TABLE: !Ref PendingApprovalsTable Policies: - DynamoDBCrudPolicy: TableName: !Ref PendingApprovalsTable - Version: '2012-10-17' Statement: - Effect: Allow Action: - states:SendTaskSuccess - states:SendTaskFailure - lambda:SendDurableExecutionCallbackSuccess - lambda:SendDurableExecutionCallbackFailure Resource: '*' Events: ApproveJob: Type: Api Properties: RestApiId: !Ref ApprovalApi Path: /approve/{jobId} Method: POST RejectJob: Type: Api Properties: RestApiId: !Ref ApprovalApi Path: /reject/{jobId} Method: POST GetJobStatus: Type: Api Properties: RestApiId: !Ref ApprovalApi Path: /status/{jobId} Method: GET Notice the approval handler has permissions for both states:SendTaskSuccess (Step Functions) and lambda:SendDurableExecutionCallbackSuccess (Durable Functions). This is the shared handler approach, one API that works with both workflow types. Deploy it: Markdown cd shared-resources sam build sam deploy --guided Step 2: The Durable Functions SAM Template Now the ETL pipeline itself for the Duration Functions. The key addition is the DurableConfig property. The DurableConfig property tells Lambda to enable durable execution for your function. YAML # durable-functions/template.yaml AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Lambda Durable Functions ETL Pipeline Globals: Function: Runtime: python3.14 Architectures: - arm64 MemorySize: 512 Timeout: 900 Resources: ETLOrchestratorFunction: Type: AWS::Serverless::Function Properties: FunctionName: ETLDurableOrchestrator CodeUri: ./src Handler: handlers/etl_handler.lambda_handler MemorySize: 1024 Timeout: 900 DurableConfig: ExecutionTimeout: 86400 # 24 hours for human approval RetentionPeriodInDays: 14 # Keep execution history for debugging AutoPublishAlias: live Policies: - AWSLambdaBasicExecutionRole - S3CrudPolicy: BucketName: !Sub "${RawBucketName}-${AWS::AccountId}" - S3CrudPolicy: BucketName: !Sub "${ProcessedBucketName}-${AWS::AccountId}" - DynamoDBCrudPolicy: TableName: !Ref ETLMetadataTable - DynamoDBCrudPolicy: TableName: etl-pending-approvals - SNSPublishMessagePolicy: TopicName: etl-approval-notifications Events: S3Upload: Type: S3 Properties: Bucket: !Ref RawDataBucket Events: s3:ObjectCreated:* Filter: S3Key: Rules: - Name: prefix Value: uploads/ - Name: suffix Value: .csv Environment: Variables: PROCESSED_BUCKET: !Sub "${ProcessedBucketName}-${AWS::AccountId}" METADATA_TABLE: !Ref ETLMetadataTable APPROVALS_TABLE: etl-pending-approvals APPROVAL_TOPIC_ARN: !ImportValue ETL-ApprovalTopicArn APPROVAL_API_URL: !ImportValue ETL-ApprovalApiUrl A few things to notice here: MemorySize: 1024 on the orchestrator (overrides the 512 MB global default). Since this single function does all the work, it needs more memory.ExecutionTimeout: 86400 – This is the total workflow duration across all invocations (24 hours). The standard Timeout: 900 is the per-invocation limit (15 minutes). Each checkpoint/resume is a fresh invocation.AutoPublishAlias: live – AWS recommends using Lambda versions with durable functions. If you update code while an execution is suspended, replay will use the version that started the execution.S3 filter with prefix: uploads/ and suffix: .csv – Only CSV files under the uploads/ directory trigger the workflow.The stack imports shared resources via !ImportValue the approval table, SNS topic, and API URL from the shared stack. Step 3: Writing the Durable Function This is where it gets interesting. The entire ETL pipeline, including the approval wait, lives in a single Lambda function. No state machine definition. No JSON DSL. Just Python code. First, the individual ETL steps. Each one is a regular Python function in a separate file: Extract Python import csv import io import boto3 import logging logger = logging.getLogger() s3_client = boto3.client("s3") def extract_data(source_bucket, source_key, step_context=None): logger.info(f"Extracting from s3://{source_bucket}/{source_key}") response = s3_client.get_object(Bucket=source_bucket, Key=source_key) content = response["Body"].read().decode("utf-8") reader = csv.DictReader(io.StringIO(content)) records = list(reader) schema = { "columns": reader.fieldnames, "source_file": source_key, "file_size_bytes": response["ContentLength"] } logger.info(f"Extracted {len(records)} records with {len(schema['columns'])} columns") return {"data": records, "record_count": len(records), "schema": schema} Transform Python import logging from datetime import datetime logger = logging.getLogger() def transform_data(raw_data, schema_config, step_context=None): logger.info(f"Transforming {len(raw_data)} records") valid_records, rejected_records = [], [] for i, record in enumerate(raw_data): try: cleaned = {k: v.strip() if isinstance(v, str) else v for k, v in record.items()} if not cleaned.get("id") or not cleaned.get("name"): rejected_records.append({"index": i, "reason": "Missing required field"}) continue if "date" in cleaned: cleaned["date"] = normalize_date(cleaned["date"]) cleaned["_processed_at"] = datetime.utcnow().isoformat() for key in ["amount", "quantity", "price"]: if key in cleaned and cleaned[key]: try: cleaned[key] = float(cleaned[key]) except ValueError: cleaned[key] = None valid_records.append(cleaned) except Exception as e: rejected_records.append({"index": i, "reason": str(e)}) return { "data": valid_records, "valid_records": len(valid_records), "rejected_records": len(rejected_records), "rejection_details": rejected_records[:100] } def normalize_date(date_str): for fmt in ["%Y-%m-%d", "%m/%d/%Y", "%d-%m-%Y", "%Y/%m/%d"]: try: return datetime.strptime(date_str, fmt).strftime("%Y-%m-%d") except ValueError: continue return date_str Load Python import json import boto3 import logging logger = logging.getLogger() s3_client = boto3.client("s3") def load_data(transformed_data, target_bucket, target_key, step_context=None): logger.info(f"Loading {len(transformed_data)} records to s3://{target_bucket}/{target_key}") output_lines = "\n".join(json.dumps(r) for r in transformed_data) s3_client.put_object( Bucket=target_bucket, Key=target_key, Body=output_lines.encode("utf-8"), ContentType="application/jsonl", Metadata={"record_count": str(len(transformed_data))} ) summary = { "record_count": len(transformed_data), "columns": list(transformed_data[0].keys()) if transformed_data else [], "sample_records": transformed_data[:3] } return {"target_path": f"s3://{target_bucket}/{target_key}", "record_count": len(transformed_data), "summary": summary} Notice the steps are plain Python functions — no special decorator, no SDK import. They take step_context=None as an optional last parameter, which keeps them testable outside the durable execution context. Now the main ETL orchestrator that ties it all together: Python import json import os import logging from datetime import datetime from aws_durable_execution_sdk_python import durable_execution, DurableContext from steps.extract import extract_data from steps.transform import transform_data from steps.load import load_data from steps.finalize import finalize_job logger = logging.getLogger() logger.setLevel(logging.INFO) PROCESSED_BUCKET = os.environ.get("PROCESSED_BUCKET") METADATA_TABLE = os.environ.get("METADATA_TABLE") @durable_execution def lambda_handler(event, context: DurableContext): # Handle both S3 event format and direct invocation if "Records" in event: s3_event = event["Records"][0]["s3"] source_bucket = s3_event["bucket"]["name"] source_key = s3_event["object"]["key"] else: source_bucket = event.get("bucket") source_key = event.get("key") # Generate job_id deterministically using context.step() job_id = context.step( lambda _: f"etl-durable-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-" f"{source_key.split('/')[-1]}", name="generate-job-id" ) context.logger.info(f"Starting ETL job: {job_id}") # Step 1: Extract extracted = context.step( lambda _: extract_data(source_bucket, source_key, None), name="extract-data" ) context.logger.info(f"Extracted {extracted['record_count']} records") # Step 2: Transform transformed = context.step( lambda _: transform_data(extracted["data"], extracted.get("schema", {}), None), name="transform-data" ) # Step 3: Load loaded = context.step( lambda _: load_data(transformed["data"], PROCESSED_BUCKET, f"processed/{job_id}/output.jsonl", None), name="load-data" ) # --- EXECUTION PAUSES HERE --- # The submitter function stores the callback_id in DynamoDB # and sends an SNS notification to the reviewer. # No compute charges while waiting for approval. def submit_for_approval(callback_id: str, ctx): return notify_reviewer(job_id, callback_id, loaded["summary"]) approval = context.wait_for_callback( submitter=submit_for_approval, name="quality-check-approval" ) # Parse approval result if isinstance(approval, str): approval = json.loads(approval) if not approval or not approval.get("approved"): return {"status": "REJECTED", "job_id": job_id, "reason": approval.get("reason", "No reason")} # Step 4: Finalize (only runs after approval) final = context.step( lambda _: finalize_job(job_id, source_bucket, source_key, loaded, approval, METADATA_TABLE, None), name="finalize-job" ) return { "status": "COMPLETED", "job_id": job_id, "records_processed": transformed["valid_records"], "output_path": loaded["target_path"], "approved_by": approval.get("reviewer"), "completed_at": final["completed_at"] } Let me break down the important parts: @durable_execution – This decorator (imported from aws_durable_execution_sdk_python) enables the checkpoint/replay mechanism on the handler.context.step(lambda _: ..., name="...") – Each step call creates a checkpoint. On replay, completed steps return their cached results instantly instead of re-executing.context.wait_for_callback(submitter=..., name="...") – This is the zero-cost waiting magic. The submitter function receives a callback_id which gets stored in DynamoDB. Execution then pauses completely — Lambda saves the state, shuts down, and you stop paying.Determinism matters – Notice job_id is generated inside a context.step(). That is intentional. Since Lambda replays your function from the beginning on resume, datetime.utcnow() would produce a different value on each replay. Wrapping it in a step ensures the timestamp gets checkpointed and replayed consistently. The notify_reviewer function (in the same file) stores the callback details in DynamoDB and sends an SNS notification: Python def notify_reviewer(job_id, callback_id, summary): import boto3 from datetime import timedelta dynamodb = boto3.resource('dynamodb') sns_client = boto3.client('sns') approvals_table = os.environ.get('APPROVALS_TABLE', 'etl-pending-approvals') approval_topic_arn = os.environ.get('APPROVAL_TOPIC_ARN') approval_api_url = os.environ.get('APPROVAL_API_URL') table = dynamodb.Table(approvals_table) ttl = int((datetime.utcnow() + timedelta(hours=24)).timestamp()) table.put_item(Item={ 'jobId': job_id, 'callbackId': callback_id, 'functionArn': os.environ.get('AWS_LAMBDA_FUNCTION_NAME'), 'workflowType': 'durable-functions', 'summary': json.dumps(summary), 'status': 'pending', 'requestedAt': datetime.utcnow().isoformat(), 'ttl': ttl }) if approval_topic_arn: sns_client.publish( TopicArn=approval_topic_arn, Subject=f'ETL Job Approval Required: {job_id}', Message=f"Job ID: {job_id}\n" f"Approve: POST {approval_api_url}/approve/{job_id}\n" f"Reject: POST {approval_api_url}/reject/{job_id}" ) return {"job_id": job_id, "callback_id": callback_id, "status": "pending"} The workflowType: 'durable-functions' field is important — it tells the shared approval handler which callback mechanism to use when the reviewer responds. Step 4: The Shared Approval Handler When the reviewer clicks approve, the shared handler looks up the callbackId from DynamoDB and sends the callback to the paused durable execution: Python # shared-resources/src/approval_handler.py (key excerpt) if workflow_type == 'durable-functions': callback_id = approval_record.get('callbackId') if approved: lambda_client.send_durable_execution_callback_success( CallbackId=callback_id, Result=json.dumps(approval_response) ) else: lambda_client.send_durable_execution_callback_failure( CallbackId=callback_id, Error='JobRejected', Cause=reason or 'Job rejected by reviewer' ) elif workflow_type == 'step-functions': task_token = approval_record.get('taskToken') if approved: stepfunctions.send_task_success( taskToken=task_token, output=json.dumps(approval_response) ) Same API, same reviewer experience — the underlying callback mechanism is the only thing that differs. Step 5: Deploy and Test Deploy in order (shared resources first, since the other stacks import from it): Markdown # 1. Deploy shared resources cd shared-resources sam build && sam deploy --guided # 2. Deploy Durable Functions cd ../durable-functions sam build && sam deploy --guided Generate test data: Markdown python scripts/generate_test_data.py --count 10 --output test-data/ Upload files to trigger the workflow (note the uploads/ prefix — the S3 filter requires it): Markdown aws s3 cp test-data/ s3://etl-raw-data-bucket-YOUR_ACCOUNT_ID/uploads/ --recursive Check approval status and approve: Markdown # Check status curl https://<api-id>.execute-api.us-east-1.amazonaws.com/prod/status/<job-id> # Approve curl -X POST https://<api-id>.execute-api.us-east-1.amazonaws.com/prod/approve/<job-id> \ -H "Content-Type: application/json" \ -d '{"reviewer": "harpreet", "reason": "Data looks good"}' For bulk approvals during testing, the repo includes a handy script: Markdown ./scripts/approve_all_jobs.sh For local testing, the testing SDK supports pytest: Markdown pip install aws-lambda-durable-execution-sdk-testing pytest durable-functions/tests/ Step 6 (Optional): Deploy Step Functions for Comparison If you want to reproduce my full comparison, deploy the Step Functions stack too: Markdown cd step-functions sam build && sam deploy --guided Here is what the same workflow looks like in Amazon States Language: JSON { "StartAt": "ExtractData", "States": { "ExtractData": { "Type": "Task", "Resource": "${ExtractFunctionArn}", "ResultPath": "$.extractResult", "Next": "TransformData" }, "TransformData": { "Type": "Task", "Resource": "${TransformFunctionArn}", "ResultPath": "$.transformResult", "Next": "LoadData" }, "LoadData": { "Type": "Task", "Resource": "${LoadFunctionArn}", "ResultPath": "$.loadResult", "Next": "WaitForApproval" }, "WaitForApproval": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken", "Parameters": { "FunctionName": "${ApprovalFunctionArn}", "Payload": { "taskToken.$": "$$.Task.Token", "jobId.$": "$.loadResult.job_id", "summary.$": "$.loadResult.summary" } }, "TimeoutSeconds": 86400, "ResultPath": "$.approvalResult", "Next": "CheckApproval" }, "CheckApproval": { "Type": "Choice", "Choices": [{ "Variable": "$.approvalResult.approved", "BooleanEquals": true, "Next": "FinalizeJob" }], "Default": "JobRejected" }, "JobRejected": { "Type": "Pass", "Result": { "status": "REJECTED" }, "End": true }, "FinalizeJob": { "Type": "Task", "Resource": "${FinalizeFunctionArn}", "End": true } } } Compare the two approaches. Durable Functions: one Python file, one Lambda, familiar programming constructs. Step Functions: a JSON state machine definition, five separate Lambda functions, plus the ASL learning curve. Both do the same thing. The Real Cost Numbers Now, here is the part that made me rebuild a mental model I had about serverless orchestration costs. I ran 1,000 CSV files through this exact workflow — both with Durable Functions and with the Step Functions implementation. The approval wait averaged about 20 minutes per document. Cost ComponentDurable FunctionsStep FunctionsDifferenceLambda invocations$0.000358$0.001-64%Lambda duration$0.0308$0.0179+72%State transitions$0.000$0.175-100%DynamoDB$0.003$0.0030%S3 operations$0.010$0.0100%TOTAL$0.044$0.207-79% Source: AWS CloudWatch Metrics The total cost, which is 79% cheaper, is mainly driven almost entirely by one thing: state transitions. Step Functions charges $0.025 per 1,000 state transitions. ASL workflow has 7 states (ExtractData, TransformData, LoadData, WaitForApproval, CheckApproval, JobRejected/FinalizeJob). For 1,000 workflows, that is 7,000 transitions, which costs $0.175. That single line (state transition) item is 84% of the total Step Functions cost. Durable Functions eliminates state transition costs. The trade-off? Higher Lambda duration costs ($0.031 vs. $0.018) because the durable function runs with 1,024 MB memory (single function handling all work) compared to Step Functions using 512 MB per function across five smaller functions. At scale, the difference adds up quickly: Daily VolumeDurable Functions/yearStep Functions/yearAnnual Savings1,000/day$16.06$75.56$59.5010,000/day$160.60$755.60$595100,000/day$1,606$7,556$5,950 And the most important validation: both systems achieved $0 compute cost during the 20-minute approval wait. That is the real game-changer compared to polling or always-on servers. Understanding the Replay Model One thing that confused me initially was the invocation count. I expected 1,000 invocations for 1,000 workflows. Instead, I got 1,788. Here is why. The checkpoint/replay model means each workflow requires a minimum of 2 invocations: Initial invocation — S3 trigger fires, function runs generate-job-id → extract → transform → load → submit-for-approval → pauseResume invocation — Callback received, function replays from the beginning (all completed steps return cached results instantly), then executes the finalize step So the theoretical minimum is 2,000 invocations for 1,000 workflows. The actual number was 1,788 because some workflows were still pending approval when I collected the metrics over the 24-hour measurement window. The important thing to remember: your code must be deterministic. Since Lambda replays your function from the beginning on resume, any non-deterministic operations (random numbers, timestamps, external API calls) must happen inside context.step() blocks so their results get checkpointed. Python job_id = context.step( lambda _: f"etl-durable-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-" f"{source_key.split('/')[-1]}", name="generate-job-id" ) That is exactly why the job_id generation in our code uses context.step().Without it, the timestamp would change on every replay. Here are some other examples where your code must be deterministic and how to avoid that: Deterministic IsssuesWhy It BreaksSolutionMath.random()Different value on every replayWrap in context.step()Date.now()Time keeps moving forwardUse context.timestamp or wrap in a stepGlobal variablesMight change between replaysPass state through function argumentsExternal API callsNetwork is a lieAlways wrap in context.step()Iterating over Map or SetIteration order can vary by runtimeUse arrays or ensure stable ordering When Not to Use Durable Functions I want to be honest about the trade-offs, because this is not a "Durable Functions is better than everything" story. Choose Step Functions when: Visual debugging matters. The step function state machine execution graph is genuinely superior. You can see exactly which step failed, inspect the input/output of each state, and non-technical stakeholders can actually understand what the workflow is doing. With Durable functions, AWS did provide visual analysis, monitoring, and debugging as well, but its little more developer-friendly. Multi-service orchestration. Step Functions has 220+ native AWS service integrations. DynamoDB, SQS, SNS, ECS, and Glue without writing Lambda glue code. In our Step Functions implementation, the ASL connects directly to Lambda function ARNs with built-in retry policies. With Durable Functions, all integrations go through your Lambda code.Express Workflows apply. For short-duration (under 5 minutes), high-throughput workflows, Step Functions Express Workflows use a different pricing model that can be very competitive. Choose Durable Functions when: Cost optimization is the priority (79% savings at scale)Workflows are Lambda-centric (your logic lives in Lambda code anyway)You prefer writing orchestration in Python/TypeScript/over Amazon States Language. AWS just now released Lambda Duration functions with Java in developer preview.Your logic is complex, and dynamic programming language is preferred by developers over the declarative ASL. AWS recommends a hybrid approach: use Durable Functions for application-level logic within Lambda, and Step Functions for high-level orchestration across multiple AWS services. Concurrency Planning — A Quick Note One thing worth mentioning: Durable Functions consolidates your entire workflow into a single Lambda function (ETLDurableOrchestrator in our case). This means your Lambda concurrency quota directly limits how many workflows can run simultaneously. Step Functions distributes execution across five separate Lambda functions (Extract, Transform, Load, Approval, Finalize), spreading the concurrency demand. In practice, this means you should plan your Lambda concurrency quotas carefully when using Durable Functions. If you expect burst uploads of hundreds or thousands of files at once, set reserved concurrency appropriately for your workload. This applies to both services — the difference is just where the concurrency demand concentrates. Wrapping Up Lambda Durable Functions is a genuinely useful addition to the serverless toolkit. For a simple ETL pipeline with human-in-the-loop approval, it delivered 79% cost savings over Step Functions while achieving the same 100% success rate and zero-cost waiting. The code-first approach feels natural if you are already comfortable writing Lambda functions in Python, TypeScript, or Java. The wait_for_callback pattern for human approvals is clean and straightforward. And the cost savings are real, which is driven entirely by the elimination of state transition charges. That said, Step Functions remains the better choice when visual workflow representation, multi-service orchestration, or operational simplicity are your priorities. There is no universal winner here, and it depends on what your team values more. The complete implementation — both SAM stacks, shared approval infrastructure, test data generation scripts, bulk approval scripts, and detailed cost analysis — is available here: github.com/hsiddhu2/aws-lambda-durable-vs-stepfunctions. Clone it, deploy both implementations, run your own 1,000-file comparison, and see the numbers for yourself. The ~79% cost advantage held consistent for this workflow, but your number will vary based on workflow complexity and state count.

By Harpreet Siddhu
Kafka and Spark Structured Streaming in Enterprise: The Patterns That Hold Up Under Pressure
Kafka and Spark Structured Streaming in Enterprise: The Patterns That Hold Up Under Pressure

I've been running Kafka and Spark Structured Streaming together in production for about five years. Not in demo environments or proof-of-concept projects. In systems processing insurance claims, manufacturing telemetry, and financial transaction data, with SLAs and compliance requirements, and people who call you at 2 AM when things break. There's a version of Kafka plus Spark Structured Streaming that looks elegant in architecture diagrams and falls apart in the first month of production. There's another version that's uglier in places but genuinely reliable. Here is what I've learned about the difference. Getting Checkpointing Right From the Start In my experience, checkpointing is non-negotiable for any streaming job that needs recovery. But checkpointing to local disk, which is the easiest configuration, means your streaming job can't recover from a node failure, only from a process restart. Checkpoint location must be on durable shared storage, ADLS Gen2 or equivalent, from the first day in production. The checkpoint contains the Kafka offsets that have been committed and the state store for stateful operations. Changing either of these, whether by manually deleting the checkpoint or by changing the query name, will reset your consumer offsets. I've seen this happen accidentally twice: once when an engineer thought deleting a stale checkpoint directory was a cleanup operation, and once when a code refactoring changed the query name used as the checkpoint key. Both required manual offset reconstruction from Kafka's own offset storage. Neither was catastrophic, but both were stressful and avoidable. Micro-Batch Sizing for Your Use Case Spark Structured Streaming processes data in micro-batches. The trigger interval controls how often a micro-batch runs. The default, if you don't specify a trigger, is to run a new batch immediately after the previous one completes. This is correct for high-throughput workloads where you want to process data as fast as possible. It's wrong for moderate-throughput workloads where you want predictable latency and manageable file sizes in your output Delta Lake tables. For our manufacturing telemetry pipeline (moderate throughput, near-real-time requirement), we use a 30-second trigger. This produces files of roughly 50 to 100MB in the output Delta table, which is manageable with a nightly compaction job. For our insurance claims pipeline (lower throughput, 5-minute SLA), we use a 2-minute trigger. My rule of thumb: choose a trigger interval that produces output files in the 50 to 500MB range for your throughput. Files significantly smaller than this create compaction debt. Files significantly larger than this create memory pressure during the micro-batch. Python # Trigger interval examples for different workloads # High throughput: process as fast as possible high_throughput_query = df.writeStream \ .trigger(availableNow=True) # Spark 3.3+: process all, then stop # Moderate throughput (manufacturing telemetry): 30-second batches telemetry_query = df.writeStream \ .trigger(processingTime="30 seconds") \ .outputMode("append") \ .format("delta") \ .option("checkpointLocation", checkpoint_path) \ .start(output_path) # Low throughput (insurance claims): 2-minute batches claims_query = df.writeStream \ .trigger(processingTime="2 minutes") \ .outputMode("append") \ .format("delta") \ .option("checkpointLocation", checkpoint_path) \ .start(output_path) Kafka Partition Count and Spark Parallelism Each Kafka partition is consumed by one Spark task per micro-batch. If your topic has 8 partitions, Spark uses 8 tasks for the Kafka read stage. If your downstream processing is more CPU-intensive than the Kafka read, you'll want more parallelism downstream. Use repartition() after the Kafka source read to increase parallelism for the heavy processing stages. In the other direction: if your Kafka topic has 200 partitions because it was sized for high throughput, but your Spark cluster has 32 cores, you're trying to run 200 tasks across 32 cores with significant context switching overhead. Consider whether the partition count on the topic is appropriate for your actual throughput. Stateful Operations and Watermarks Windowed aggregations and stream-stream joins require Spark to maintain state across micro-batches. Without a watermark, Spark will accumulate state indefinitely, and your executor memory will grow without bound until the job fails. Always define a watermark on your event-time column for any stateful operation. The watermark threshold is a business decision as much as a technical one. A 10-minute watermark means Spark will discard events that arrive more than 10 minutes after the event time they are associated with. If your source systems can deliver events up to 30 minutes late (common in some IoT and batch-settlement scenarios), a 10-minute watermark will cause late events to be silently dropped. Understand your source latency characteristics before setting the watermark. Python # Watermark definition for late-arriving events from pyspark.sql.functions import from_json, col, window claims_parsed = kafka_df \ .select(from_json(col("value").cast("string"), claims_schema) .alias("data")) \ .select("data.*") \ .withWatermark("event_timestamp", "30 minutes") # Windowed aggregation with watermark claims_hourly = claims_parsed \ .groupBy( window("event_timestamp", "1 hour"), "claim_type", "region" ) \ .agg( count("claim_id").alias("claim_count"), sum("claim_amount").alias("total_amount"), avg("claim_amount").alias("avg_amount") ) The Monitoring You Need on Day One First up: consumer lag per partition. This is the most important streaming metric. Growing lag means your consumer can't keep up with producer throughput, and your latency SLA is in jeopardy. Second: micro-batch duration. If micro-batch duration exceeds your trigger interval, you have a processing bottleneck. The job is trying to run continuously without keeping up. Third: state store size for stateful operations. A growing state store is a memory leak waiting to become an OOM failure. My team emits these three metrics from every streaming job to Azure Monitor. When any of them crosses a threshold, we get an alert before users notice a problem. Setting this up properly at deployment time, not after the first production incident, has saved us from several avoidable outages. Python # Azure Monitor metrics emission from Spark streaming from pyspark.sql.streaming import StreamingQueryListener from opencensus.ext.azure import metrics_exporter class StreamingMetricsListener(StreamingQueryListener): def __init__(self, app_insights_key): self.exporter = metrics_exporter.new_metrics_exporter( connection_string=f"InstrumentationKey={app_insights_key}") def onQueryProgress(self, event): p = event.progress self.emit("consumer_lag", p.sources[0].endOffset - p.sources[0].startOffset) self.emit("batch_duration_ms", p.batchDuration) self.emit("state_store_rows", p.stateOperators[0].numRowsTotal if p.stateOperators else 0) def emit(self, name, value): # Send to Azure Monitor / Application Insights self.exporter.export_metrics([{ "name": f"streaming.{name}", "value": value, "timestamp": datetime.utcnow() }]) spark.streams.addListener(StreamingMetricsListener(AI_KEY))

By Kuladeep Sandra
Exactly-Once Processing: Myth vs Reality
Exactly-Once Processing: Myth vs Reality

Exactly-once processing (EOP) is often touted as the gold standard for reliability in distributed systems. The promise of processing each message just once seems perfect, whether you're developing financial systems, real-time analytics pipelines, or event-driven microservices. But the truth is much more complex. What most systems refer to as "exactly once" is actually an approximation that balances trade-offs, limitations, and assumptions rather than an absolute. To help you create systems that are accurate, robust, and practical, this article dissects exactly once processing. The Myth The myth is straightforward: The entire program must be executed exactly once if the broker supports exactly once semantics. Similarly, it is frequently assumed that the entire pipeline ensures exactly-once processing if a stream processor can restore state from checkpoints. Another widespread misconception is that, the user-visible result must likewise have happened exactly once if a database transaction commits only once. When analyzed throughout a whole, real-world system, none of these claims actually hold. The Reality This is a more truthful statement: A broker can provide exactly-once writes between producers and broker-managed topics. Within its checkpoint boundary, a stream processor can provide exactly-once state transitions. One local transaction can be made atomic by a database. When idempotency and replay-safety are implemented across all boundaries, an end-to-end business workflow is effective only once. That goes beyond semantics. It is designed for failure. Three Important Questions Three distinct questions should be asked in a useful design review: Is it possible to send the same input more than once? Is it possible to try the same calculation more than once? Is it possible for a business effect to occur more than once? The approach is typically effective if the first two questions are answered affirmatively, but the third is answered negatively. Definition Before debating "exactly once," precisely clarify the meanings at each layer: SemanticDefinitionAt-most-onceA message may be lost, but once acknowledged it is not retried. No duplicates, possible data loss.At-least-onceA message is retried until acknowledged. No data loss, but duplicates are possible.Exactly-once within a boundaryA subsystem guarantees one logical input results in one durable state transition relative to its recovery model.Effectively-onceDuplicate deliveries may occur, but the final business effect is idempotent, so users observe the action exactly once. The first hard fact is that exactly once is nearly always local rather than global. Where The Boundaries Actually Break Local exactly-once guarantees stop at subsystem boundaries unless the next hop is also replay-safe. Acknowledgment, retry, timeout, crash recovery, replay, and deduplication are concepts unique to each boundary. It is not a given that a strong assurance at one boundary would automatically compose with the next. BoundaryMechanismGuarantee LevelFailure RiskProducer to BrokerProducer ID + sequence numbersIdempotent appendLimited to broker partitionsBroker to ConsumerConsumer group offsetsAt-least-once deliveryCrash leads to replayConsumer to DBACID transaction + unique keyIdempotent sink writeNo dedupe causes duplicatesDB to OutboxSame ACID transactionAtomic state + eventRelay may republishOutbox to Downstream BrokerEvent ID deduplication by consumerAt-least-once publishConsumer must handle dedupeService to External APIIdempotency key per requestEffectively-onceTimeout creates uncertainty Every boundary has its own notion of acknowledgment, retry, timeout, crash recovery, replay, and deduplication. Why Are Duplicates Common Rather Than Unusual Recovery behavior typically results in duplicates: After a transmit timeout, a producer attempts again because it cannot distinguish between failure and success. Before the offset acknowledgment, a customer writes to a sink and crashes. A processor reruns a portion of the input after restoring from a checkpoint. Because the publish acknowledgment was not observed, a relay republishes an outbox record. Even though the remote service has already applied the request, an HTTP client attempts again after the timeout. None of those actions is strange. They are precisely what well-behaved distributed software accomplishes under uncertainty. Technical Reality at Each Layer Producer Semantics At the producer layer, exactly once usually means idempotent append behavior to the broker, not magical downstream correctness. For a broker such as Kafka, an idempotent producer can use: Producer id Sequence numbers Partition-level ordering Broker-side dedupe of retried sends A minimal producer configuration seems like: Java enable.idempotence=true acks=all retries=Integer.MAX_VALUE max.in.flight.requests.per.connection=5 This is important. It stops a common type of duplicate appends caused by producer retries. However, it only discusses records that were written to Kafka partitions. It makes no mention of: A consumer writing twice to PostgreSQL A webhook firing twice An email being sent twice Kafka Producer Details: Idempotence Versus Transactions Kafka provides you with two distinct tools that are frequently mistakenly combined into a single idea: Idempotent producer semantics Transactional producers Semantics Retried appends to a partition benefit from idempotence. Multiple writes and offset commits can be combined into a single broker-visible unit with the aid of transactions. The following are typical transactional producer settings: Java enable.idempotence=true transactional.id=orders-service-1 acks=all And the application flow is conceptually: Java producer.initTransactions(); producer.beginTransaction(); producer.send(outputRecord1); producer.send(outputRecord2); producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata); producer.commitTransaction(); This resolves the issue: output records and ingested offsets can become atomically visible simultaneously within Kafka if the application consumes from Kafka and produces back to Kafka. But take note of what's absent: No database write is included No webhook is included No Redis mutation is included No email is included Kafka transactions are genuine, but their scope is Kafka-managed state rather than arbitrary external state. Broker and Consumer Semantics When the customer's progress marker is behind the durable write, it has already completed, the broker may redeliver a record. The classic race is: Consumer reads message Consumer writes to sink Process crashes before offset commit Consumer restarts Broker redelivers The business effect occurs twice if the sink write is not idempotent. For this reason, "broker exactly once" and "application exactly once" are not interchangeable terms. Kafka Consumer Details: Offsets Are Not Business State The consumer offset as evidence that the business activity occurred exactly once is the root cause of many application issues. It isn't. The offset is only evidence of the consumer group's advancement. These are distinct facts: The broker knows a record was consumed by a consumer group The application knows a sink mutation was committed The business knows the effect should be visible once Only when the application specifically aligns those facts will they do so. Additionally, "isolation.level=read_committed" is frequently misinterpreted on the read side. It stops customers from seeing transactional writes that Kafka producers have aborted. Although that is helpful, not all downstream effects are transformed into exactly one. It simply means: Committed Kafka transactions are visible Aborted Kafka transactions are hidden It does not mean: Downstream database writes cannot duplicate External side effects cannot duplicate Application retries are automatically idempotent Because of this, consumer accuracy nearly always necessitates one of: Idempotent sink keys Inbox dedupe tables Sink-side upserts Compare-and-set version checks Business-key uniqueness constraints The Honest Way to Document Guarantees Rather than writing "the system is exactly once," use scoped statements for each boundary in your documentation: BoundaryGuarantee StatementProducer to KafkaProducer writes to Kafka are idempotent per partition.Processor StateState transitions are exactly-once relative to checkpoint recovery.Primary Database WritesWrites are idempotent by business key and message ID.Outbox PublicationPublication is at-least-once; downstream consumers must dedupe by event ID.Webhook / SaaS APIsIntegrations rely on idempotency keys and replay-safe receivers.End-to-End BehaviorBusiness behavior is effectively-once under replay and recovery. Final Thoughts If you make the promise specific and local, exactly once processing is not a fiction. Within clearly specified constraints, databases, Kafka, Flink, and other subsystems can unquestionably offer exactly-once guarantees. The misconception is that the local assurances automatically combine to form a single end-to-end promise that spans brokers, processors, databases, outboxes, caches, webhooks, and external APIs. The robust architecture in real systems is typically: At-least-once delivery Transactional local state Idempotent sinks Outbox or inbox patterns Idempotency keys for side effects Explicit documentation of every guarantee boundary It is the version that endures node draining, broker failovers, replay after restore, and unclear network timeouts, but it is less commercially viable than stating "exactly once everywhere."

By Irullappan irulandi
Building Enterprise-Grade Real-Time IoT Dashboards with Vue 3, MQTT, and Kafka
Building Enterprise-Grade Real-Time IoT Dashboards with Vue 3, MQTT, and Kafka

The convergence of IoT, real-time data streaming, and modern frontend frameworks is redefining how engineers build enterprise monitoring systems. Over the course of designing and leading the Device IoT Platform — an enterprise-grade solution for real-time monitoring, configuration, and diagnostics of thousands of distributed network devices — I encountered and solved a core architectural challenge: how do you build a frontend dashboard that can handle hundreds of concurrent device telemetry streams without sacrificing performance, maintainability, or user experience? This article shares the architectural patterns, technology decisions, and hard-won lessons from that journey — covering the full stack from MQTT ingestion to Vue 3 reactivity to Kafka-backed event processing. The Core Problem: Real-Time at Scale Most developers are familiar with polling — periodically fetching data from an API endpoint. For IoT, polling is fundamentally broken: Latency: A 5-second polling interval means 5 seconds of stale state.Inefficiency: You're requesting data even when nothing has changed.Scale: 1,000 devices × 1 request/5s = 200 requests/second just to read status — before any user interaction. The solution is event-driven architecture: devices push telemetry when something changes, and the platform reacts. This requires a rethinking of both backend ingestion and frontend state management. Architecture Overview Plain Text [IoT Devices] | MQTT Broker (Mosquitto / AWS IoT Core) | [Node.js Telemetry Microservice] | [Kafka Topic: device.telemetry.raw] | (stream processor) [Kafka Topic: device.telemetry.enriched] | [WebSocket Server (Node.js)] | [Vue 3 Dashboard Frontend] Each layer has a distinct responsibility: MQTT Broker handles lightweight publish/subscribe with devices using minimal overhead.Node.js microservices bridge MQTT → Kafka, performing initial validation and normalization.Kafka provides durable, replayable event streams — critical for audit trails and late-joining consumers.WebSocket server fans out enriched telemetry to connected dashboard clients in real time.Vue 3 handles reactive rendering, ensuring only the affected UI components re-render when new data arrives. Backend: MQTT → Kafka Bridge in Node.js The heart of the ingestion pipeline is a lightweight Node.js service using the mqtt and kafkajs libraries. Plain Text import mqtt from 'mqtt'; import { Kafka } from 'kafkajs'; const mqttClient = mqtt.connect(process.env.MQTT_BROKER_URL!, { clientId: `telemetry-bridge-${process.pid}`, username: process.env.MQTT_USERNAME, password: process.env.MQTT_PASSWORD, clean: true, }); const kafka = new Kafka({ clientId: 'iot-bridge', brokers: [process.env.KAFKA_BROKER!] }); const producer = kafka.producer(); mqttClient.on('connect', async () => { await producer.connect(); mqttClient.subscribe('devices/+/telemetry', { qos: 1 }); console.log('MQTT → Kafka bridge active'); }); mqttClient.on('message', async (topic, payload) => { const deviceId = topic.split('/')[1]; const data = JSON.parse(payload.toString()); await producer.send({ topic: 'device.telemetry.raw', messages: [ { key: deviceId, value: JSON.stringify({ deviceId, timestamp: Date.now(), ...data }), }, ], }); }); Key design decisions here: QoS Level 1 — ensures at-least-once delivery for telemetry messages. For command acknowledgments, we use QoS 2.Device ID as Kafka partition key — guarantees ordering per device while allowing parallel processing across partitions.Separation of raw vs. enriched topics — the device.telemetry.raw topic contains the bare payload; a downstream stream processor enriches it with device metadata, geolocation, and alert thresholds before publishing to device.telemetry.enriched. WebSocket Fan-Out Server The WebSocket layer subscribes to Kafka's enriched topic and pushes updates to connected browser clients. We use Kafka consumer groups to allow horizontal scaling of the WebSocket tier. Plain Text import { WebSocketServer } from 'ws'; import { Kafka } from 'kafkajs'; const wss = new WebSocketServer({ port: 8080 }); const kafka = new Kafka({ clientId: 'ws-fanout', brokers: [process.env.KAFKA_BROKER!] }); const consumer = kafka.consumer({ groupId: 'websocket-fanout-group' }); // Track subscriptions: deviceId → Set<WebSocket> const deviceSubscriptions = new Map<string, Set<WebSocket>>(); wss.on('connection', (ws) => { ws.on('message', (msg) => { const { action, deviceId } = JSON.parse(msg.toString()); if (action === 'subscribe') { if (!deviceSubscriptions.has(deviceId)) { deviceSubscriptions.set(deviceId, new Set()); } deviceSubscriptions.get(deviceId)!.add(ws); } }); ws.on('close', () => { deviceSubscriptions.forEach((clients) => clients.delete(ws)); }); }); async function startKafkaConsumer() { await consumer.connect(); await consumer.subscribe({ topic: 'device.telemetry.enriched' }); await consumer.run({ eachMessage: async ({ message }) => { const payload = JSON.parse(message.value!.toString()); const clients = deviceSubscriptions.get(payload.deviceId); clients?.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(payload)); } }); }, }); } startKafkaConsumer(); This design enables selective subscription — a dashboard user viewing 50 devices only receives telemetry for those 50 devices, not the full firehose. This is critical for performance at scale. Frontend: Vue 3 Reactive Architecture The frontend is built with Vue 3 Composition API + Pinia for state management. The goal is to update only the UI components bound to a specific device when its telemetry arrives — not re-render the entire dashboard. WebSocket Composable Plain Text // composables/useDeviceTelemetry.ts import { ref, onMounted, onUnmounted } from 'vue'; import { useDeviceStore } from '@/stores/deviceStore'; export function useDeviceTelemetry(deviceIds: string[]) { const store = useDeviceStore(); let ws: WebSocket | null = null; const connect = () => { ws = new WebSocket(import.meta.env.VITE_WS_URL); ws.onopen = () => { deviceIds.forEach((id) => { ws!.send(JSON.stringify({ action: 'subscribe', deviceId: id })); }); }; ws.onmessage = (event) => { const telemetry = JSON.parse(event.data); store.updateDeviceTelemetry(telemetry.deviceId, telemetry); }; ws.onclose = () => { // Exponential backoff reconnection setTimeout(connect, Math.min(1000 * 2 ** reconnectAttempts++, 30000)); }; }; onMounted(connect); onUnmounted(() => ws?.close()); } Pinia Store with Fine-Grained Reactivity Plain Text // stores/deviceStore.ts import { defineStore } from 'pinia'; import { reactive } from 'vue'; interface DeviceTelemetry { deviceId: string; status: 'online' | 'offline' | 'degraded'; signalStrength: number; latency: number; lastSeen: number; alerts: string[]; } export const useDeviceStore = defineStore('devices', () => { const telemetryMap = reactive<Record<string, DeviceTelemetry>>({}); function updateDeviceTelemetry(deviceId: string, data: Partial<DeviceTelemetry>) { if (!telemetryMap[deviceId]) { telemetryMap[deviceId] = {} as DeviceTelemetry; } Object.assign(telemetryMap[deviceId], data); } return { telemetryMap, updateDeviceTelemetry }; }); Using reactive() with a map structure means Vue's dependency tracking is at the property level — a component subscribed to telemetryMap['device-001'].signalStrength won't re-render when device-002's data changes. This is the key to dashboard scalability. Device Card Component Plain Text <!-- components/DeviceCard.vue --> <template> <div class="device-card" :class="statusClass"> <div class="device-header"> <span class="device-id">{{ deviceId }</span> <StatusBadge :status="telemetry?.status" /> </div> <div class="metrics"> <MetricBar label="Signal" :value="telemetry?.signalStrength" unit="dBm" /> <MetricBar label="Latency" :value="telemetry?.latency" unit="ms" /> </div> <AlertList :alerts="telemetry?.alerts ?? []" /> </div> </template> <script setup lang="ts"> import { computed } from 'vue'; import { useDeviceStore } from '@/stores/deviceStore'; const props = defineProps<{ deviceId: string }>(); const store = useDeviceStore(); // Only this device's slice of state — targeted re-renders only const telemetry = computed(() => store.telemetryMap[props.deviceId]); const statusClass = computed(() => ({ 'status-online': telemetry.value?.status === 'online', 'status-offline': telemetry.value?.status === 'offline', 'status-degraded': telemetry.value?.status === 'degraded', })); </script> Performance Optimizations 1. Virtual Scrolling for Large Device Lists When monitoring 500+ devices, rendering all device cards simultaneously tanks performance. We use vue-virtual-scrollerto only render visible cards: Plain Text <RecycleScroller class="device-list" :items="filteredDevices" :item-size="120" key-field="deviceId" v-slot="{ item }" > <DeviceCard :device-id="item.deviceId" /> </RecycleScroller> 2. Debounced Batch Updates Devices can emit bursts of telemetry. Updating the Pinia store on every single message causes excessive re-renders. We batch incoming messages within a 100ms window: Plain Text let pendingUpdates: Record<string, Partial<DeviceTelemetry>> = {}; let batchTimeout: ReturnType<typeof setTimeout> | null = null; function queueUpdate(deviceId: string, data: Partial<DeviceTelemetry>) { pendingUpdates[deviceId] = { ...(pendingUpdates[deviceId] ?? {}), ...data }; if (!batchTimeout) { batchTimeout = setTimeout(() => { Object.entries(pendingUpdates).forEach(([id, update]) => { store.updateDeviceTelemetry(id, update); }); pendingUpdates = {}; batchTimeout = null; }, 100); } } 3. Lazy Loading and Code Splitting Device diagnostic panels (charts, event logs, configuration editors) are loaded on demand: Plain Text const DeviceDiagnostics = defineAsyncComponent( () => import('@/components/DeviceDiagnostics.vue') ); Combined with route-level code splitting, the initial bundle stays under 200KB gzipped. Security Architecture: OAuth 2.0 + RBAC Device management platforms require fine-grained access control. Not every user should be able to issue firmware update commands to production devices. JWT Claims-Based RBAC We encode role information directly in the JWT access token: Plain Text { "sub": "user-123", "roles": ["device:read", "device:configure"], "scope": "region:us-east", "exp": 1699999999 } The frontend reads these claims to conditionally render action buttons, and the backend validates them on every API call: Plain Text // middleware/rbac.ts export function requirePermission(permission: string) { return (req: Request, res: Response, next: NextFunction) => { const token = req.headers.authorization?.split(' ')[1]; const decoded = verifyJWT(token!); if (!decoded.roles.includes(permission)) { return res.status(403).json({ error: 'Insufficient permissions' }); } next(); }; } // Route definition router.post('/devices/:id/firmware', requirePermission('device:firmware'), handleFirmwareUpdate); Deployment: CI/CD on AWS The entire platform is containerized and deployed via a GitLab CI/CD pipeline to AWS ECS with Fargate. Plain Text # .gitlab-ci.yml (excerpt) stages: - test - build - deploy build-and-push: stage: build script: - docker build -t $ECR_REGISTRY/iot-frontend:$CI_COMMIT_SHA . - docker push $ECR_REGISTRY/iot-frontend:$CI_COMMIT_SHA deploy-production: stage: deploy script: - aws ecs update-service --cluster iot-platform --service frontend --force-new-deployment environment: production only: - main Blue-green deployments ensure zero downtime for this 24/7 critical infrastructure platform. Results and Key Metrics After migrating from a polling-based architecture to this event-driven stack: Dashboard latency: reduced from 5–10 seconds (polling) to under 200ms (WebSocket push).Backend API load: reduced by ~78% — telemetry pushes replaced constant polling.Frontend bundle size: kept under 220KB gzipped through lazy loading and tree-shaking.Throughput: validated at 10,000 concurrent telemetry events/second through Kafka partitioning. Conclusion Building a real-time IoT dashboard at enterprise scale requires rethinking the entire data flow — from device communication protocols through streaming pipelines to fine-grained frontend reactivity. The combination of MQTT for lightweight device communication, Kafka for durable event streaming, WebSockets for real-time push to browsers, and Vue 3's targeted reactivity model creates a system that scales gracefully without sacrificing developer ergonomics. The patterns described here — selective WebSocket subscriptions, batched Pinia updates, virtual scrolling, and JWT-based RBAC — have been validated in production on a platform serving critical network infrastructure. They are broadly applicable to any domain requiring real-time monitoring at scale: energy management, fleet tracking, industrial automation, and beyond. Github: Real-Time-IoT-Dashboards-Vue-3-MQTT-Kafka

By Venkata Sandeep Dhullipalla

Top Big Data Experts

expert thumbnail

Miguel Garcia

VP of Engineering,
Factorial

Miguel has a great background in leading teams and building high-performance solutions for the retail sector. An advocate of platform design as a service and data as a product.
expert thumbnail

Gautam Goswami

Founder,
DataView

Enthusiastic about learning and sharing knowledge on Big Data, Data Science & related headways including data streaming platforms through knowledge sharing platform Dataview.in. Presently serving as Head of Engineering & Data Streaming at Irisidea TechSolutions, Bangalore, India. https://www.irisidea.com/gautam-goswami/
expert thumbnail

Ram Ghadiyaram

Vice President - Fintech/ Cloud /Bigdata / Analytics / AI & ML,
JPMorgan Chase & Co.

Fintech | Cloud | Big Data Analytics | AI & ML Expert . Venkata Ram Anjaneya Prasad Gadiyaram(aka Ram Ghadiyaram) is a seasoned Cloud Big Data analytics, AI/ML , mentor, and innovator. Open source lover :-)

The Latest Big Data Topics

article thumbnail
Fine-Tuning LLMs at Scale With Databricks MLflow and Spark
Learn how Databricks, Apache Spark, MLflow, and Hugging Face Transformers work together to create an end-to-end fine-tuning platform.
June 30, 2026
by Jubin Abhishek Soni DZone Core CORE
· 72 Views
article thumbnail
Data Pipeline Observability: Why Your AI Model Fails in Production
Your machine learning model had 95% accuracy in testing, but crashes in production. The problem isn't the model, it's your data pipeline.
June 26, 2026
by Abhilash Rao Mesala
· 790 Views
article thumbnail
Implementing Asynchronous Communication Between Microservices Using Kafka and Spring Boot
Kafka decouples services, buffers spikes, and routes failures to a DLT. Schemas are contracts; consumers must be idempotent.
June 24, 2026
by Mallikharjuna Manepalli
· 1,592 Views
article thumbnail
Data Governance Checklist for AI-Driven Systems
A practical checklist for evaluating AI data readiness, covering data quality, governance, lineage, access controls, retrieval systems, and ongoing monitoring.
June 23, 2026
by Abhishek Gupta DZone Core CORE
· 1,159 Views · 1 Like
article thumbnail
The Real-Time Revolution: Why Blockchain Needs Data Stream Processing
Blockchain and data streaming are bringing unprecedented levels of security, transparency, and real-time mechanisms to move data across the digital world.
June 17, 2026
by Gautam Goswami DZone Core CORE
· 1,443 Views
article thumbnail
Parallel Kafka Batch Processing With Kotlin Coroutines in Spring Boot
Learn how Kotlin Coroutines improve Spring Boot Kafka batch processing with parallel execution, resource throttling, and faster database operations.
June 16, 2026
by Erkin Karanlık
· 2,165 Views · 1 Like
article thumbnail
From ETL to Lakeflow: Shifting to a Declarative Data Paradigm
The article focuses on moving away from traditional, "imperative" ETL processes to a modern, "declarative" approach using the Databricks Lakeflow platform.
June 15, 2026
by Seshendranath Balla Venkata
· 1,387 Views · 1 Like
article thumbnail
Rust-Native Alternatives to Spark SQL and DataFrame Workloads
Sail is an open-source computation framework that serves as a drop-in replacement for Apache Spark (SQL and DataFrame API) in both single-host and distributed settings.
June 11, 2026
by Srinivasarao Rayankula
· 1,873 Views · 2 Likes
article thumbnail
Combining Temporal and Kafka for Resilient Distributed Systems
Kafka handles durable event streaming while Temporal manages long-running workflow state, retries, and recovery to build resilient distributed systems.
June 9, 2026
by Akhil Madineni
· 1,732 Views · 1 Like
article thumbnail
The Big Data Architecture Blueprint: Core Storage, Integration, and Governance Patterns
This comprehensive technical guide breaks down the essential architectural, storage, and integration patterns required to scale enterprise big data platforms.
June 8, 2026
by Ram Ghadiyaram DZone Core CORE
· 1,682 Views · 1 Like
article thumbnail
Is the Data Warehouse Dead? 3 Patterns From Enterprise Architecture That Answer This Question
No, but its role has fundamentally changed. Here is what I have seen work, after building data platforms at enterprise scale across multiple industries.
June 5, 2026
by Nabarun Bandyopadhyay
· 3,824 Views · 1 Like
article thumbnail
Building Threat Intelligence Pipelines Using Python, APIs, and Elasticsearch
STIX/TAXII in, ECS normalized, provenance preserved deterministic IDs, correct bulk writes, ingest pipelines keep threat indicator data reliable and queryable under load.
June 3, 2026
by Krishnaveni Musku
· 2,991 Views
article thumbnail
Optimizing Databricks Spark Pipelines Using Declarative Patterns
This article explains why hand-tuning Spark is becoming the slow path — and what the declarative alternatives actually look like in production.
June 1, 2026
by Seshendranath Balla Venkata
· 1,318 Views · 1 Like
article thumbnail
Event-Driven Pipelines With Apache Pulsar and Go
Build scalable, real-time pipelines with Apache Pulsar and Go using event-driven producers and consumers that communicate via Pulsar topics.
May 29, 2026
by Shivi Kashyap
· 2,912 Views
article thumbnail
Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
Define API, event, and DB contracts upfront to enable parallel development, catch breaking changes in CI, and maintain consistent, reliable integrations.
May 29, 2026
by Wallace Espindola
· 2,601 Views
article thumbnail
Building a Zero-Cost Approval Workflow With AWS Lambda Durable Functions
Learn how to build an ETL pipeline with human-in-the-loop approval that costs nothing while waiting — and see real cost data from processing 1,000 documents.
May 28, 2026
by Harpreet Siddhu
· 4,435 Views · 1 Like
article thumbnail
Kafka and Spark Structured Streaming in Enterprise: The Patterns That Hold Up Under Pressure
The streaming patterns that survive in the enterprise are those built for scale, failure recovery, and long-term operability.
May 27, 2026
by Kuladeep Sandra
· 6,729 Views
article thumbnail
Exactly-Once Processing: Myth vs Reality
Exactly-once is a collection of local guarantees, not an end-to-end guarantee, and real systems rely on idempotency and at-least-once guarantees.
May 26, 2026
by Irullappan irulandi
· 2,658 Views · 2 Likes
article thumbnail
Building Enterprise-Grade Real-Time IoT Dashboards with Vue 3, MQTT, and Kafka
Event-driven architecture using MQTT (device communication) → Kafka (durable streams) → WebSocket (browser push) → Vue 3 (reactive UI).
May 26, 2026
by Venkata Sandeep Dhullipalla
· 2,192 Views
article thumbnail
Edge Computing in Utility IoT: Two Architecture Patterns That Actually Work
In this article, we break down edge architecture patterns that fit modern utility infrastructure when power flows both ways.
May 22, 2026
by Yevheniia Mala
· 3,947 Views
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • ...
  • Next
  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook
×