Cloud + data orchestration: Demolish your data silos. Enable complex analytics. Eliminate I/O bottlenecks. Learn the essentials (and more)!
2024 DZone Community Survey: SMEs wanted! Help shape the future of DZone. Share your insights and enter to win swag!
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
The Past, Present, and Future of Stream Processing
Advanced SQL for Data Engineering
What Is Data Governance? Data governance is a framework that is developed through the collaboration of individuals with various roles and responsibilities. This framework aims to establish processes, policies, procedures, standards, and metrics that help organizations achieve their goals. These goals include providing reliable data for business operations, setting accountability, and authoritativeness, developing accurate analytics to assess performance, complying with regulatory requirements, safeguarding data, ensuring data privacy, and supporting the data management life cycle. Creating a Data Governance Board or Steering Committee is a good first step when integrating a Data Governance program and framework. An organization’s governance framework should be circulated to all staff and management, so everyone understands the changes taking place. The basic concepts needed to successfully govern data and analytics applications. They are: A focus on business values and the organization’s goals An agreement on who is responsible for data and who makes decisions A model emphasizing data curation and data lineage for Data Governance Decision-making that is transparent and includes ethical principles Core governance components include data security and risk management Provide ongoing training, with monitoring and feedback on its effectiveness Transforming the workplace into collaborative culture, using Data Governance to encourage broad participation What Is Data Integration? Data integration is the process of combining and harmonizing data from multiple sources into a unified, coherent format that various users can consume, for example: operational, analytical, and decision-making purposes. The data integration process consists of four primary critical components: 1. Source Systems Source systems, such as databases, file systems, Internet of Things (IoT) devices, media continents, and cloud data storage, provide the raw information that must be integrated. The heterogeneity of these source systems results in data that can be structured, semi-structured, or unstructured. Databases: Centralized or distributed repositories are designed to store, organize, and manage structured data. Examples include relational database management systems (RDBMS) like MySQL, PostgreSQL, and Oracle. Data is typically stored in tables with predefined schemas, ensuring consistency and ease of querying. File systems: Hierarchical structures that organize and store files and directories on disk drives or other storage media. Common file systems include NTFS (Windows), APFS (macOS), and EXT4 (Linux). Data can be of any type, including structured, semi-structured, or unstructured. Internet of Things (IoT) devices: Physical devices (sensors, actuators, etc.) that are embedded with electronics, software, and network connectivity. IoT devices collect, process, and transmit data, enabling real-time monitoring and control. Data generated by IoT devices can be structured (e.g., sensor readings), semi-structured (e.g., device configuration), or unstructured (e.g., video footage). Media repositories: Platforms or systems designed to manage and store various types of media files. Examples include content management systems (CMS) and digital asset management (DAM) systems. Data in media repositories can include images, videos, audio files, and documents. Cloud data storage: Services that provide on-demand storage and management of data online. Popular cloud data storage platforms include Amazon S3, Microsoft Azure Blob Storage, and Google Cloud Storage. Data in cloud storage can be accessed and processed from anywhere with an internet connection. 2. Data Acquisition Data acquisition involves extracting and collecting information from source systems. Different methods can be employed based on the source system's nature and specific requirements. These methods include batch processes, streaming methods utilizing technologies like ETL (Extract, Transform, Load), ELT (Extract, Load, Transform), API (Application Programming Interface), streaming, virtualization, data replication, and data sharing. Batch processes: Batch processes are commonly used for structured data. In this method, data is accumulated over a period of time and processed in bulk. This approach is advantageous for large datasets and ensures data consistency and integrity. Application Programming Interface (API): APIs serve as a communication channel between applications and data sources. They allow for controlled and secure access to data. APIs are commonly used to integrate with third-party systems and enable data exchange. Streaming: Streaming involves continuous data ingestion and processing. It is commonly used for real-time data sources such as sensor networks, social media feeds, and financial markets. Streaming technologies enable immediate analysis and decision-making based on the latest data. Virtualization: Data virtualization provides a logical view of data without physically moving or copying it. It enables seamless access to data from multiple sources, irrespective of their location or format. Virtualization is often used for data integration and reducing data silos. Data replication: Data replication involves copying data from one system to another. It enhances data availability and redundancy. Replication can be synchronous, where data is copied in real-time, or asynchronous, where data is copied at regular intervals. Data sharing: Data sharing involves granting authorized users or systems access to data. It facilitates collaboration, enables insights from multiple perspectives, and supports informed decision-making. Data sharing can be implemented through various mechanisms such as data portals, data lakes, and federated databases. 3. Data Storage Upon data acquisition, storing data in a repository is crucial for efficient access and management. Various data storage options are available, each tailored to specific needs. These options include: Database Management Systems (DBMS): Relational Database Management Systems (RDBMS) are software systems designed to organize, store, and retrieve data in a structured format. These systems offer advanced features such as data security, data integrity, and transaction management. Examples of popular RDBMS include MySQL, Oracle, and PostgreSQL. NoSQL databases, such as MongoDB and Cassandra, are designed to store and manage semi-structured data. They offer flexibility and scalability, making them suitable for handling large amounts of data that may need to fit better into a relational model. Cloud storage services: Cloud storage services offer scalable and cost-effective storage solutions in the cloud. They provide on-demand access to data from anywhere with an internet connection. Popular cloud storage services include Amazon S3, Microsoft Azure Storage, and Google Cloud Storage. Data lakes: Data lakes are large repositories of raw and unstructured data in their native format. They are often used for big data analytics and machine learning. Data lakes can be implemented using Hadoop Distributed File System (HDFS) or cloud-based storage services. Delta lakes: Delta lakes are a type of data lake that supports ACID transactions and schema evolution. They provide a reliable and scalable data storage solution for data engineering and analytics workloads. Cloud data warehouse: Cloud data warehouses are cloud-based data storage solutions designed for business intelligence and analytics. They provide fast query performance and scalability for large volumes of structured data. Examples include Amazon Redshift, Google BigQuery, and Snowflake. Big data files: Big data files are large collections of data stored in a single file. They are often used for data analysis and processing tasks. Common big data file formats include Parquet, Apache Avro, and Apache ORC. On-premises Storage Area Networks (SAN): SANs are dedicated high-speed networks designed for data storage. They offer fast data transfer speeds and provide centralized storage for multiple servers. SANs are typically used in enterprise environments with large storage requirements. Network Attached Storage (NAS): NAS devices are file-level storage systems that connect to a network and provide shared storage space for multiple clients. They are often used in small and medium-sized businesses and offer easy access to data from various devices. Choosing the right data storage option depends on factors such as data size, data type, performance requirements, security needs, and cost considerations. Organizations may use a combination of these storage options to meet their specific data management needs. 4. Consumption This is the final stage of the data integration lifecycle, where the integrated data is consumed by various applications, data analysts, business analysts, data scientists, AI/ML models, and business processes. The data can be consumed in various forms and through various channels, including: Operational systems: The integrated data can be consumed by operational systems using APIs (Application Programming Interfaces) to support day-to-day operations and decision-making. For example, a customer relationship management (CRM) system may consume data about customer interactions, purchases, and preferences to provide personalized experiences and targeted marketing campaigns. Analytics: The integrated data can be consumed by analytics applications and tools for data exploration, analysis, and reporting. Data analysts and business analysts use these tools to identify trends, patterns, and insights from the data, which can help inform business decisions and strategies. Data sharing: The integrated data can be shared with external stakeholders, such as partners, suppliers, and regulators, through data-sharing platforms and mechanisms. Data sharing enables organizations to collaborate and exchange information, which can lead to improved decision-making and innovation. Kafka: Kafka is a distributed streaming platform that can be used to consume and process real-time data. Integrated data can be streamed into Kafka, where it can be consumed by applications and services that require real-time data processing capabilities. AI/ML: The integrated data can be consumed by AI (Artificial Intelligence) and ML (Machine Learning) models for training and inference. AI/ML models use the data to learn patterns and make predictions, which can be used for tasks such as image recognition, natural language processing, and fraud detection. The consumption of integrated data empowers businesses to make informed decisions, optimize operations, improve customer experiences, and drive innovation. By providing a unified and consistent view of data, organizations can unlock the full potential of their data assets and gain a competitive advantage. What Are Data Integration Architecture Patterns? In this section, we will delve into an array of integration patterns, each tailored to provide seamless integration solutions. These patterns act as structured frameworks, facilitating connections and data exchange between diverse systems. Broadly, they fall into three categories: Real-Time Data Integration Near Real-Time Data Integration Batch Data Integration 1. Real-Time Data Integration In various industries, real-time data ingestion serves as a pivotal element. Let's explore some practical real-life illustrations of its applications: Social media feeds display the latest posts, trends, and activities. Smart homes use real-time data to automate tasks. Banks use real-time data to monitor transactions and investments. Transportation companies use real-time data to optimize delivery routes. Online retailers use real-time data to personalize shopping experiences. Understanding real-time data ingestion mechanisms and architectures is vital for choosing the best approach for your organization. Indeed, there's a wide range of Real-Time Data Integration Architectures to choose from. Among them most commonly used architectures are: Streaming-Based Architecture Event-Driven Integration Architecture Lambda Architecture Kappa Architecture Each of these architectures offers its unique advantages and use cases, catering to specific requirements and operational needs. a. Streaming-Based Data Integration Architecture In a streaming-based architecture, data streams are continuously ingested as they arrive. Tools like Apache Kafka are employed for real-time data collection, processing, and distribution. This architecture is ideal for handling high-velocity, high-volume data while ensuring data quality and low-latency insights. Streaming-based architecture, powered by Apache Kafka, revolutionizes data processing. It involves continuous data ingestion, enabling real-time collection, processing, and distribution. This approach facilitates real-time data processing, handles large volumes of data, and prioritizes data quality and low-latency insights. The diagram below illustrates the various components involved in a streaming data integration architecture. b. Event-Driven Integration Architecture An event-driven architecture is a highly scalable and efficient approach for modern applications and microservices. This architecture responds to specific events or triggers within a system by ingesting data as the events occur, enabling the system to react quickly to changes. This allows for efficient handling of large volumes of data from various sources. c. Lambda Integration Architecture The Lambda architecture embraces a hybrid approach, skillfully blending the strengths of batch and real-time data ingestion. It comprises two parallel data pipelines, each with a distinct purpose. The batch layer expertly handles the processing of historical data, while the speed layer swiftly addresses real-time data. This architectural design ensures low-latency insights, upholding data accuracy and consistency even in extensive distributed systems. d. Kappa Data Integration Architecture Kappa architecture is a simplified variation of Lambda architecture specifically designed for real-time data processing. It employs a solitary stream processing engine, such as Apache Flink or Apache Kafka Streams, to manage both historical and real-time data, streamlining the data ingestion pipeline. This approach minimizes complexity and maintenance expenses while simultaneously delivering rapid and precise insights. 2. Near Real-Time Data Integration In near real-time data integration, the data is processed and made available shortly after it is generated, which is critical for applications requiring timely data updates. Several patterns are used for near real-time data integration, a few of them have been highlighted below: a. Change Data Capture — Data Integration Change Data Capture (CDC) is a method of capturing changes that occur in a source system's data and propagating those changes to a target system. b. Data Replication — Data Integration Architecture With the Data Replication Integration Architecture, two databases can seamlessly and efficiently replicate data based on specific requirements. This architecture ensures that the target database stays in sync with the source database, providing both systems with up-to-date and consistent data. As a result, the replication process is smooth, allowing for effective data transfer and synchronization between the two databases. c. Data Virtualization — Data Integration Architecture In Data Virtualization, a virtual layer integrates disparate data sources into a unified view. It eliminates data replication, dynamically routes queries to source systems based on factors like data locality and performance, and provides a unified metadata layer. The virtual layer simplifies data management, improves query performance, and facilitates data governance and advanced integration scenarios. It empowers organizations to leverage their data assets effectively and unlock their full potential. 3. Batch Process: Data Integration Batch Data Integration involves consolidating and conveying a collection of messages or records in a batch to minimize network traffic and overhead. Batch processing gathers data over a period of time and then processes it in batches. This approach is particularly beneficial when handling large data volumes or when the processing demands substantial resources. Additionally, this pattern enables the replication of master data to replica storage for analytical purposes. The advantage of this process is the transmission of refined results. The traditional batch process data integration patterns are: Traditional ETL Architecture — Data Integration Architecture This architectural design adheres to the conventional Extract, Transform, and Load (ETL) process. Within this architecture, there are several components: Extract: Data is obtained from a variety of source systems. Transform: Data undergoes a transformation process to convert it into the desired format. Load: Transformed data is then loaded into the designated target system, such as a data warehouse. Incremental Batch Processing — Data Integration Architecture This architecture optimizes processing by focusing only on new or modified data from the previous batch cycle. This approach enhances efficiency compared to full batch processing and alleviates the burden on the system's resources. Micro Batch Processing — Data Integration Architecture In Micro Batch Processing, small batches of data are processed at regular, frequent intervals. It strikes a balance between traditional batch processing and real-time processing. This approach significantly reduces latency compared to conventional batch processing techniques, providing a notable advantage. Pationed Batch Processing — Data Integration Architecture In this partitioned batch processing approach, voluminous datasets are strategically divided into smaller, manageable partitions. These partitions can then be efficiently processed independently, frequently leveraging the power of parallelism. This methodology offers a compelling advantage by reducing processing time significantly, making it an attractive choice for handling large-scale data. Conclusion Here are the main points to take away from this article: It's important to have a strong data governance framework in place when integrating data from different source systems. The data integration patterns should be selected based on the use cases, such as volume, velocity, and veracity. There are 3 types of Data integration styles, and we should choose the appropriate model based on different parameters.
Credit card fraud is a significant concern for financial institutions, as it can lead to considerable monetary losses and damage customer trust. Real-time fraud detection systems are essential for identifying and preventing fraudulent transactions as they occur. Apache Flink is an open-source stream processing framework that excels at handling real-time data analytics. In this deep dive, we'll explore how to implement a real-time credit card fraud detection system using Apache Flink on AWS. Apache Flink Overview Apache Flink is a distributed stream processing engine designed for high-throughput, low-latency processing of real-time data streams. It provides robust stateful computations, exactly-once semantics, and a flexible windowing mechanism, making it an excellent choice for real-time analytics applications such as fraud detection. System Architecture Our fraud detection system will consist of the following components: Kinesis Data Streams: For ingesting real-time transaction data. Apache Flink on Amazon Kinesis Data Analytics: For processing the data streams. Amazon S3: For storing reference data and checkpoints. AWS Lambda: For handling alerts and notifications. Amazon DynamoDB: For storing transaction history and fraud detection results. Setting up the Environment Before we begin, ensure that you have an AWS account and the AWS CLI installed and configured. Step 1: Set up Kinesis Data Streams Create a Kinesis data stream to ingest transaction data: Shell aws kinesis create-stream --stream-name CreditCardTransactions --shard-count 1 Step 2: Set up S3 Bucket Create an S3 bucket to store reference data and Flink checkpoints: Shell aws s3 mb s3://flink-fraud-detection-bucket Upload your reference datasets (e.g., historical transaction data, customer profiles) to the S3 bucket. Step 3: Set up DynamoDB Create a DynamoDB table to store transaction history and fraud detection results: Shell aws dynamodb create-table --table-name FraudDetectionResults --attribute-definitions AttributeName=TransactionId,AttributeType=S --key-schema AttributeName=TransactionId,KeyType=HASH --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=10 Step 4: Set up Lambda Function Create a Lambda Function To Handle Fraud Alerts Use the AWS Management Console or the AWS CLI to create a function with the necessary permissions to write to the DynamoDB table and send notifications. ## Implementing the Flink Application ### Dependencies Add the following dependencies to your Mavenpom.xml` file: XML <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-dynamodb_2.11</artifactId> <version>1.12.0</version> </dependency> <!-- Add other necessary dependencies --> </dependencies> Flink Application Code Create a Flink streaming application that reads from the Kinesis data stream, processes the transactions, and writes the results to DynamoDB. Java import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema; import org.apache.flink.util.Collector; // Define your transaction class public class Transaction { public String transactionId; public String creditCardId; public double amount; public long timestamp; // Add other relevant fields and methods } public class FraudDetector implements FlatMapFunction<Transaction, Alert> { private transient ValueState<Boolean> flagState; @Override public void flatMap(Transaction transaction, Collector<Alert> out) throws Exception { // Implement your fraud detection logic // Set flagState value based on detection // Output an alert if fraud is detected } @[Overdrive Sports](@overspd14ts) public void open(Configuration parameters) { ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("flag", Boolean.class); flagState = getRuntimeContext().getState(descriptor); } } public class Alert { public String alertId; public String transactionId; // Add other relevant fields and methods } public class FraudDetectionJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the Kinesis consumer Properties inputProperties = new Properties(); inputProperties.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); inputProperties.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your_access_key_id"); inputProperties.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your_secret_access_key"); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<Transaction> transactionStream = env.addSource( new FlinkKinesisConsumer<>( a "CreditCardTransactions", a new JSONDeserializationSchema<>(Transaction.class), a inputProperties ) ); // Process the stream DataStream<Alert> alerts = transactionStream .keyBy(transaction -> transaction.creditCardId) .flatMap(new FraudDetector()); // Configure the Kinesis producer Properties outputProperties = new Properties(); outputProperties.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); outputProperties.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "your_access_key_id"); outputProperties.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "your_secret_access_key"); FlinkKinesisProducer<Alert> kinesisProducer = new FlinkKinesisProducer<>( new SimpleStringSchema(), outputProperties ); kinesisProducer.setDefaultStream("FraudAlerts"); kinesisProducer.setDefaultPartition("0"); alerts.addSink(kinesisProducer); // Execute the job env.execute("Fraud Detection Job"); } } - Deploying the Flink Application To deploy the Flink application on Amazon Kinesis Data Analytics, follow these steps: Package your application into a JAR file. Upload the JAR file to an S3 bucket. Create a Kinesis Data Analytics application in the AWS Management Console. Configure the application to use the uploaded JAR file. Start the application. Monitoring and Scaling Once your Flink application is running, you can monitor its performance through the Kinesis Data Analytics console. If you need to scale up the processing capabilities, you can increase the number of Kinesis shards or adjust the parallelism settings in your Flink job. Conclusion In this deep dive, we've explored how to implement a real-time credit card fraud detection system using Apache Flink on AWS. By leveraging the power of Flink's stream processing capabilities and AWS's scalable infrastructure, we can detect and respond to fraudulent transactions as they occur, providing a robust solution to combat credit card fraud. Remember to test thoroughly and handle edge cases, such as network failures and unexpected data formats, to ensure your system is resilient and reliable.
In today's data-driven environment, mastering the profiling of large datasets with Apache Spark and Deequ is crucial for any professional dealing with data analysis, SEO optimization, or similar fields requiring a deep dive into digital content. Apache Spark offers the computational power necessary for handling vast amounts of data, while Deequ provides a layer for quality assurance, setting benchmarks for what could be termed 'unit tests for data'. This combination ensures that business users gain confidence in their data's integrity for analysis and reporting purposes. Have you ever encountered challenges in maintaining the quality of large datasets or found it difficult to ensure the reliability of data attributes used in your analyses? If so, integrating Deequ with Spark could be the solution you're looking for. This article is designed to guide you through the process, from installation to practical application, with a focus on enhancing your workflow and outcomes. By exploring the functionalities and benefits of Deequ and Spark, you will learn how to apply these tools effectively in your data projects, ensuring that your datasets not only meet but exceed quality standards. Let's delve into how these technologies can transform your approach to data profiling and quality control. Introduction to Data Profiling With Apache Spark and Deequ Understanding your datasets deeply is crucial in data analytics, and this is where Apache Spark and Deequ shine. Apache Spark is renowned for its fast processing of large datasets, which makes this famous tool indispensable for data analytics. Its architecture is adept at handling vast amounts of data efficiently, which is critical for data profiling. Deequ complements Spark by focusing on data quality. This synergy provides a robust solution for data profiling, allowing for the identification and correction of issues like missing values or inconsistencies, which are vital for accurate analysis. What exactly makes Deequ an invaluable asset for ensuring data quality? At its core, Deequ is built to implement 'unit tests for data', a concept that might sound familiar if you have a background in software development. These tests are not for code, however; they're for your data. They allow you to set specific quality benchmarks that your datasets must meet before being deemed reliable for analysis or reporting. Imagine you're handling customer data. With Deequ, you can easily set up checks to ensure that every customer record is complete, that email addresses follow a valid format, or that no duplicate entries exist. This level of scrutiny is what sets Deequ apart—it transforms data quality from a concept into a measurable, achievable goal. The integration of Deequ with Apache Spark leverages Spark's scalable data processing framework to apply these quality checks across vast datasets efficiently. This combination does not merely flag issues; it provides actionable insights that guide the correction process. For instance, if Deequ detects a high number of incomplete records in a dataset, you can then investigate the cause—be it a flaw in data collection or an error in data entry—and rectify it, thus enhancing the overall quality of your data. Below is a high-level diagram (Source: AWS) that illustrates the Deequ library's usage within the Apache Spark ecosystem: Setting up Apache Spark and Deequ for Data Profiling To begin data profiling with Apache Spark and Deequ, setting up your environment is essential. Ensure Java and Scala are installed, as they are prerequisites for running Spark, which you can verify through Spark's official documentation. For Deequ, which works atop Spark, add the library to your build manager. If you're using Maven, it's as simple as adding the Deequ dependency to your pom.xml file. For SBT, include it in your build.sbt file, and make sure it matches your Spark version. Python users, you're not left out. PyDeequ is your go-to for integrating Deequ's capabilities into your Python environment. Install it with pip using the following commands: Python pip install pydeequ After installation, conduct a quick test to ensure everything is running smoothly: Python import pydeequ # Simple test to verify installation print(pydeequ.__version__) This quick test prints the installed version of PyDeequ, confirming that your setup is ready for action. With these steps, your system is now equipped to perform robust data quality checks with Spark and Deequ, paving the way for in-depth data profiling in your upcoming projects. Practical Guide To Profiling Data With Deequ Once your environment is prepared with Apache Spark and Deequ, you're ready to engage in the practical side of data profiling. Let’s focus on some of the key metrics that Deequ provides for data profiling —Completeness, Uniqueness, and Correlation. First is Completeness; this metric ensures data integrity by verifying the absence of null values in your data. Uniqueness identifies and eliminates duplicate records, ensuring data distinctiveness. Finally, Correlation quantifies the relationship between two variables, providing insights into data dependencies. Let’s say you have a dataset from IMDb with the following structure: Python root |-- tconst: string (nullable = true) |-- titleType: string (nullable = true) |-- primaryTitle: string (nullable = true) |-- originalTitle: string (nullable = true) |-- isAdult: integer (nullable = true) |-- startYear: string (nullable = true) |-- endYear: string (nullable = true) |-- runtimeMinutes: string (nullable = true) |-- genres: string (nullable = true) |-- averageRating: double (nullable = true) |-- numVotes: integer (nullable = true) We'll use the following Scala script to profile the dataset. This script will apply various Deequ analyzers to compute metrics such as the size of the dataset, the completeness of the 'averageRating' column, and the uniqueness of the 'tconst' identifier. Scala import com.amazon.deequ.analyzers._ import com.amazon.deequ.AnalysisRunner import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("Deequ Profiling Example") .getOrCreate() val data = spark.read.format("csv").option("header", "true").load("path_to_imdb_dataset.csv") val runAnalyzer: AnalyzerContext = { AnalysisRunner .onData(data) .addAnalyzer(Size()) .addAnalyzer(Completeness("averageRating")) .addAnalyzer(Uniqueness("tconst")) .addAnalyzer(Mean("averageRating")) .addAnalyzer(StandardDeviation("averageRating")) .addAnalyzer(Compliance("top rating", "averageRating >= 7.0")) .addAnalyzer(Correlation("numVotes", "averageRating")) .addAnalyzer(Distinctness("tconst")) .addAnalyzer(Maximum("averageRating")) .addAnalyzer(Minimum("averageRating")) .run() } val metricsResult = successMetricsAsDataFrame(spark, runAnalyzer) metricsResult.show(false) Executing this script provides a DataFrame output, which reveals several insights about our data: From the output, we observe: The dataset has 7,339,583 rows. The tconst column's complete distinctness and uniqueness at 1.0 indicates every value in the column is unique. The averageRating spans from a minimum of 1 to a maximum of 10, averaging at 6.88 with a standard deviation of 1.39, highlighting the data's rating variation. A completeness score of 0.148 for the averageRating column reveals that only about 15% of the dataset's records have a specified average rating. Analyzing the relationship between numVotes and averageRating through the Pearson correlation coefficient, which stands at 0.01, indicates an absence of correlation between these two variables, aligning with expectations. These metrics equip us with insights to navigate your dataset's intricacies, supporting informed decisions and strategic planning in data management. Advanced Applications and Strategies for Data Quality Assurance Data quality assurance is an ongoing process, vital for any data-driven operation. With tools like Deequ, you can implement strategies that not only detect issues but also prevent them. By employing data profiling on incremental data loads, we can detect anomalies and maintain consistency over time. For instance, utilizing Deequ’s AnalysisRunner, we can observe historical trends and set up checks that capture deviations from expected patterns. For example, if the usual output of your ETL jobs is around 7 million records, a sudden increase or decrease in this count could be a telltale sign of underlying issues. It’s crucial to investigate such deviations as they may indicate problems with data extraction or loading processes. Utilizing Deequ’s Check function allows you to verify compliance with predefined conditions, such as expected record counts, to flag these issues automatically. Attribute uniqueness, crucial in data integrity, also requires constant vigilance. Imagine discovering a change in the uniqueness score of a customer ID attribute, which should be unwaveringly unique. This anomaly could indicate duplicate records or data breaches. Timely detection through profiling using Deequ's Uniqueness metric will help you maintain the trustworthiness of your data. Historical consistency is another pillar of quality assurance. Should the 'averageRating' column, which historically fluctuates between 1 and 10, suddenly exhibit values outside this range, which raises questions. Is this a data input error or an actual shift in user behavior? Profiling with Deequ helps you discern the difference and take appropriate measures. The AnalysisRunner can be configured to track the historical distribution of 'averageRating' and alert you to any anomalies. Business Use Case for Anomaly Detection Using Aggregated Metric From Deequ Consider a business use case where a process is crawling the pages of websites and it requires a mechanism to identify if the crawling process is working as expected or not. In order to place an anomaly detection in this process, we can use the Deequ library to identify record counts at particular intervals and use it for advanced anomaly detection techniques. For e.g., a crawl is identifying 9500 to 10500 pages daily on a website over a period of 2 months. In this case, if the crawl range goes above or below this range we may like to raise an alert to the team. The diagram below displays the daily calculated record count of pages seen on the website. Using basic statistical techniques like rate of change (records change on a day-to-day basis), one can see that the changes always oscillate around zero as shown in the image below. The diagram below displays the normal distribution of the rate of change and based on the shape of the bell curve it is evident that the anticipated change for this data point is around 0% with a standard deviation of 2.63%. This indicates that for this website the page addition/deletion follows a range of around -5.26% to +5.25% with 90% confidence. Based on this indicator, one can set up a rule on the page record count to raise an alert, if the change range does not follow this guideline. This is a basic example of using the statistical method over data to identify anomalies over aggregated numbers. Based on the historic data availability and factors such as seasonality etc., methodology such as Holt-Winters Forecasting can be used for efficient anomaly detection. The fusion of Apache Spark and Deequ emerges as a powerful combo that will help you elevate the integrity and reliability of your datasets. Through the practical applications and strategies demonstrated above, we've seen how Deequ not only identifies but prevents anomalies, ensuring the consistency and accuracy of your precious data. So, if you want to unlock the full potential of your data, I advise you to leverage the power of Spark and Deequ. With this toolset, you will safeguard your data's quality and dramatically enhance your decision-making processes, and your data-driven insights will be both robust and reliable.
In the evolving landscape of data engineering, reverse ETL has emerged as a pivotal process for businesses aiming to leverage their data warehouses and other data platforms beyond traditional analytics. Reverse ETL, or “Extract, Transform, Load” in reverse, is the process of moving data from a centralized data warehouse or data lake to operational systems and applications within your data pipeline. This enables businesses to operationalize their analytics, making data actionable by feeding it back into the daily workflows and systems that need it most. How Does Reverse ETL Work? Reverse ETL can be visualized as a cycle that begins with data aggregated in a data warehouse. The data is then extracted, transformed (to fit the operational systems' requirements), and finally loaded into various business applications such as a CRM, marketing platforms, or other customer support tools. These concepts can be further explored in this resource on the key components of a data pipeline. Key Components of Reverse ETL To effectively implement reverse ETL, it's essential to understand its foundational elements. Each component plays a specific role in ensuring that the data flows smoothly from the data warehouse to operational systems, maintaining integrity and timeliness. Here's a closer look at the key components that make reverse ETL an indispensable part of modern data architecture. Connectors: Connectors are the bridges between the data warehouse and target applications. They are responsible for the secure and efficient transfer of data. Transformers: Transformers modify the data into the appropriate format or structure required by the target systems, ensuring compatibility and maintaining data integrity. Loaders: Loaders are responsible for inserting the transformed data into the target applications, completing the cycle of data utilization. Data quality: Data quality is paramount in reverse ETL as it ensures that the data being utilized in operational systems is accurate, consistent, and trustworthy. Without high-quality data, business decisions made based on this data could be flawed, leading to potential losses and inefficiencies. Scheduling: Scheduling is crucial for the timeliness of data in operational systems. It ensures that the reverse ETL process runs at optimal times to update the target systems with the latest data, which is essential for maintaining real-time or near-real-time data synchronization across the business. Evolution of Data Management and ETL The landscape of data management has undergone significant transformation over the years, evolving to meet the ever-growing demands for accessibility, speed, and intelligence in data handling. ETL processes have been at the core of this evolution, enabling businesses to consolidate and prepare data for strategic analysis and decision-making. Understanding Traditional ETL Traditional ETL (Extract, Transform, Load) is a foundational process in data warehousing that involves three key steps: Extract: Data is collected from various operational systems, such as transactional databases, CRM systems, and other business applications. Transform: The extracted data is cleansed, enriched, and reformatted to fit the schema and requirements of the data warehouse. This step may involve sorting, summarizing, deduplicating, and validating to ensure the data is consistent and ready for analysis. Load: The transformed data is then loaded into the data warehouse, where it is stored and made available for querying and analysis. Challenges With Traditional ETL Traditional ETL has been a staple in data processing and analytics for many years; however, it presents several challenges that can hinder an organization's ability to access and utilize data efficiently, specifically: Data Accessibility Efficient data access is crucial for timely decision-making, yet traditional ETL can create barriers that impede this flow, such as: Data silos: Traditional ETL processes often lead to data silos where information is locked away in the data warehouse, making it less accessible for operational use. Limited integration: Integration of new data sources and operational systems can be complex and time-consuming, leading to difficulties in accessing a holistic view of the data landscape. Data governance: While governance is necessary, it can also introduce access controls that, if overly restrictive, limit timely data accessibility for users and systems that need it. Latency The agility of data-driven operations hinges on the promptness of data delivery, but traditional ETL processes can introduce delays that affect the currency of data insights, exemplified by: Batch processing: ETL processes are typically batch-based, running during off-peak hours. This means that data can be outdated by the time it's available in the data warehouse for operational systems, reporting, and analysis. Heavy processing loads: Transformation processes can be resource-intensive, leading to delays especially when managing large volumes of data. Pipeline complexity: Complex data pipelines with numerous sources and transformation steps can increase the time it takes to process and load data. An Introduction to Reverse ETL Reverse ETL emerged as organizations began to recognize the need to not only make decisions based on their data but to operationalize these insights directly within their business applications. The traditional ETL process focused on aggregating data from operational systems into a central data warehouse for analysis. However, as the analytics matured, the insights derived from this data needed to be put into action; this birthed the differing methods for data transformation based on use case: ETL vs. ELT vs. Reverse ETL. The next evolutionary step was to find a way to move the data and insights from the data warehouse back into the operational systems — effectively turning these insights into direct business outcomes. Reverse ETL was the answer to this, creating a feedback loop from the data warehouse to operational systems. By transforming the data already aggregated, processed, and enriched within the data warehouse and then loading it back into operational tools (the "reverse" of ETL), organizations can enrich their operational systems with valuable, timely insights, thus complementing the traditional data analytics lifecycle. Benefits of Reverse ETL As part of the evolution of traditional ETL, reverse ETL presented two key advantages: Data accessibility: With Reverse ETL, data housed in a data warehouse can be transformed and seamlessly merged back into day-to-day business tools, breaking down silos and making data more accessible across the organization. Real-time data synchronization: By moving data closer to the point of action, operational systems get updated with the most relevant, actionable insights, often in near-real-time, enhancing decision-making processes. Common Challenges of Reverse ETL Despite the key benefits of reverse ETL, there are several common challenges to consider: Data consistency and quality: Ensuring the data remains consistent and high-quality as it moves back into varied operational systems requires rigorous checks and ongoing maintenance. Performance impact on operational systems: Introducing additional data loads to operational systems can impact their performance, which must be carefully managed to avoid disruption to business processes. Security and regulatory compliance: Moving data out of the data warehouse raises concerns about security and compliance, especially when dealing with sensitive or regulated data. Understanding these challenges and benefits helps organizations effectively integrate reverse ETL into their data-driven workflow, enriching operational systems with valuable insights and enabling more informed decisions across the entire business. Reverse ETL Use Cases and Applications Reverse ETL unlocks the potential of data warehouses by bringing analytical insights directly into the operational tools that businesses use every day. Here are some of the most impactful ways that reverse ETL is being applied across various business functions: Customer Relationship Management (CRM): Reverse ETL tools transform and sync demographic and behavioral data from the data warehouse into CRM systems, providing sales teams with enriched customer profiles for improved engagement strategies. Marketing automation: Utilize reverse ETL's transformation features to tailor customer segments based on data warehouse insights and sync them with marketing platforms, enabling targeted campaigns and in-depth performance reporting. Customer support: Transform and integrate product usage patterns and customer feedback from the data warehouse into support tools, equipping agents with actionable data to personalize customer interactions. Product development: Usage-driven development that leverages reverse ETL to transform and feed feature interaction data back into product management tools, guiding the development of features that align with user engagement and preferences. In each of these use cases, reverse ETL tools not only move data but also apply necessary transformations to ensure that the data fits the operational context of the target systems, enhancing the utility and applicability of the insights provided. Five Factors to Consider Before Implementing Reverse ETL When considering the implementation of reverse ETL at your organization, it's important to evaluate several factors that can impact the success and efficiency of the process. Here are some key considerations: 1. Data Volume Assess the volume of data that will be moved to ensure that the reverse ETL tool can handle the load without performance degradation. Determine the data throughput needs, considering peak times and whether the tool can process large batches of data efficiently. 2. Data Integration Complexity Consider the variety of data sources, target systems, and whether the reverse ETL tool supports all necessary connectors. Evaluate the complexity of the data transformations required and whether the tool provides the necessary functionality to implement these transformations easily. 3. Scalability Ensure that the reverse ETL solution can scale with your business needs, handling increased data loads and additional systems over time. 4. Application Deployment and Maintenance Verify that the tool is accessible through preferred web browsers like Chrome and Safari. Determine whether the tool can be cloud-hosted or self-hosted, and understand the hosting preferences of your enterprise customers (on-prem vs. cloud). Look for built-in integration with version control systems like GitHub for detecting and applying configuration changes. 5. Security When implementing reverse ETL, ensure robust security by confirming the tool's adherence to SLAs with uptime monitoring, a clear process for regular updates and patches, and compliance with data protection standards like GDPR. Additionally, verify the tool's capability for data tokenization, encryption standards for data-at-rest, and possession of key certifications like SOC 2 Type 2 and EU/US Privacy Shield. By summarizing these factors, organizations can ensure that the reverse ETL tool they select not only meets their data processing needs but also aligns with their technical infrastructure, security standards, and regulatory compliance requirements. Reverse ETL Best Practices To maximize the benefits of reverse ETL, it's essential to adhere to best practices that ensure the process is efficient, secure, and scalable. These practices lay the groundwork for a robust data infrastructure: Data governance: Establish clear data governance policies to maintain data quality and compliance throughout the reverse ETL process. Monitoring and alerting: Implement comprehensive monitoring and alerting to quickly identify and resolve issues with data pipelines. Scalability and performance: Design reverse ETL workflows with scalability in mind to accommodate future growth and ensure that they do not negatively impact the performance of source or target systems. Top Three Reverse ETL Tools Choosing the right reverse ETL tool is crucial for success. Here's a brief overview of three popular platforms: Hightouch: A platform that specializes in syncing data from data warehouses directly to business tools, offering a wide range of integrations and a user-friendly interface. Census: Known for its strong integration capabilities, Census allows businesses to operationalize their data warehouse content across their operational systems. Segment: Known for its customer data platform (CDP), Segment provides Reverse ETL features that allow businesses to use their customer data in marketing, sales, and customer service applications effectively. To help select the most suitable reverse ETL tool for your organization's needs, here's a comparison table that highlights key features and differences between example solutions: Reverse ETL Tool Comparison Feature Hightouch Census Segment Core Offering Reverse ETL Reverse ETL CDP + limited reverse ETL Connectors Extensive Broad Broad Custom Connector Yes Yes Yes Real-Time Sync Yes Yes Yes Transformation Layer Yes Yes Only available on customer data Security & Compliance Strong Strong Strong Pricing Model Rows-based Fields-based Tiered Bottom Line: Is Reverse ETL Right for Your Business? Reverse ETL can be a game-changer for businesses looking to leverage their data warehouse insights in operational systems and workflows. If your organization requires real-time data access, enhanced customer experiences, or more personalized marketing efforts, reverse ETL could be the right solution. However, it's essential to consider factors such as data volume, integration complexity, and security requirements to ensure that a reverse ETL tool aligns with your business objectives and technical requirements.
JavaScript is a pivotal technology for web applications. With the emergence of Node.js, JavaScript became relevant for both client-side and server-side development, enabling a full-stack development approach with a single programming language. Both Node.js and Apache Kafka are built around event-driven architectures, making them naturally compatible for real-time data streaming. This blog post explores open-source JavaScript clients for Apache Kafka and discusses the trade-offs and limitations of JavaScript Kafka producers and consumers compared to stream processing technologies such as Kafka Streams or Apache Flink. JavaScript: A Pivotal Technology for Web Applications JavaScript is a pivotal technology for web applications, serving as the backbone of interactive and dynamic web experiences. Here are several reasons JavaScript is essential for web applications: Interactivity: JavaScript enables the creation of highly interactive web pages. It responds to user actions in real time, allowing for the development of features such as interactive forms, animations, games, and dynamic content updates without the need to reload the page. Client-side scripting: Running in the user's browser, JavaScript reduces server load by handling many tasks on the client's side. This can lead to faster web page loading times and a smoother user experience. Universal browser support: All modern web browsers support JavaScript, making it a universally accessible programming language for web development. This wide support ensures that JavaScript-based features work consistently across different browsers and devices. Versatile frameworks and libraries: The JavaScript ecosystem includes a vast array of frameworks and libraries (such as React, Angular, and Vue.js) that streamline the development of web applications, from single-page applications to complex web-based software. These tools offer reusable components, two-way data binding, and other features that enhance productivity and maintainability. Real-time applications: JavaScript is ideal for building real-time applications, such as chat apps and live streaming services, thanks to technologies like WebSockets and frameworks that support real-time communication. Rich web APIs: JavaScript can access a wide range of web APIs provided by browsers, allowing for the development of complex features, including manipulating the Document Object Model (DOM), making HTTP requests (AJAX or Fetch API), handling multimedia, and tracking user geolocation. SEO and performance optimization: Modern JavaScript frameworks and server-side rendering solutions help in building fast-loading web pages that are also search engine friendly, addressing one of the traditional criticisms of JavaScript-heavy applications. In conclusion, JavaScript's capabilities offer the tools and flexibility needed to build everything from simple websites to complex, high-performance web applications. Full-Stack Development: JavaScript for the Server-Side With Node.js With the advent of Node.js, JavaScript is not just used only for the client side of web applications. JavaScript is for both client-side and server-side development. It enables a full-stack development approach with a single programming language. This simplifies the development process and allows for seamless integration between the frontend and backend. Using JavaScript for backend applications, especially with Node.js, offers several advantages: Unified language for frontend and backend: JavaScript on the backend allows developers to use the same language across the entire stack, simplifying development and reducing context switching. This can lead to more efficient development processes and easier maintenance. High performance: Node.js is a popular JavaScript runtime. It is built on Chrome's V8 engine, which is known for its speed and efficiency. Node.js uses non-blocking, event-driven architecture. The architecture makes it particularly suitable for I/O-heavy operations and real-time applications like chat applications and online gaming. Vast ecosystem: JavaScript has one of the largest ecosystems, powered by npm (Node Package Manager). npm provides a vast library of modules and packages that can be easily integrated into your projects, significantly reducing development time. Community support: The JavaScript community is one of the largest and most active, offering a wealth of resources, frameworks, and tools. This community support can be invaluable for solving problems, learning new skills, and staying up to date with the latest technologies and best practices. Versatility: JavaScript with Node.js can be used for developing a wide range of applications, from web and mobile applications to serverless functions and microservices. This versatility makes it a go-to choice for many developers and companies. Real-time data processing: JavaScript is well-suited for applications requiring real-time data processing and updates, such as live chats, online gaming, and collaboration tools, because of its non-blocking nature and efficient handling of concurrent connections. Cross-platform development: Tools like Electron and React Native allow JavaScript developers to build cross-platform desktop and mobile applications, respectively, further extending JavaScript's reach beyond the web. Node.js's efficiency and scalability, combined with the ability to use JavaScript for both frontend and backend development, have made it a popular choice among developers and companies around the world. Its non-blocking, event-driven I/O characteristics are a perfect match for an event-driven architecture. JavaScript and Apache Kafka for Event-Driven Applications Using Node.js with Apache Kafka offers several benefits for building scalable, high-performance applications that require real-time data processing and streaming capabilities. Here are several reasons integrating Node.js with Apache Kafka is helpful: Unified language for full-stack development: Node.js allows developers to use JavaScript across both the client and server sides, simplifying development workflows and enabling seamless integration between frontend and backend systems, including Kafka-based messaging or event streaming architectures. Event-driven architecture: Both Node.js and Apache Kafka are built around event-driven architectures, making them naturally compatible. Node.js can efficiently handle Kafka's real-time data streams, processing events asynchronously and non-blocking. Scalability: Node.js is known for its ability to handle concurrent connections efficiently, which complements Kafka's scalability. This combination is ideal for applications that require handling high volumes of data or requests simultaneously, such as IoT platforms, real-time analytics, and online gaming. Large ecosystem and community support: Node.js's extensive npm ecosystem includes Kafka libraries and tools that facilitate the integration. This support speeds up development, offering pre-built modules for connecting to Kafka clusters, producing and consuming messages, and managing topics. Real-time data processing: Node.js is well-suited for building applications that require real-time data processing and streaming, a core strength of Apache Kafka. Developers can leverage Node.js to build responsive and dynamic applications that process and react to Kafka data streams in real-time. Microservices and cloud-native applications: The combination of Node.js and Kafka is powerful for developing microservices and cloud-native applications. Kafka serves as the backbone for inter-service communication. Node.js is used to build lightweight, scalable service components. Flexibility and speed: Node.js enables rapid development and prototyping. Kafka environments can implement new streaming data pipelines and applications quickly. In summary, using Node.js with Apache Kafka leverages the strengths of both technologies to build efficient, scalable, and real-time applications. The combination is an attractive choice for many developers. Open Source JavaScript Clients for Apache Kafka Various open-source JavaScript clients exist for Apache Kafka. Developers use them to build everything from simple message production and consumption to complex streaming applications. When choosing a JavaScript client for Apache Kafka, consider factors like performance requirements, ease of use, community support, commercial support, and compatibility with your Kafka version and features. Open Source JavaScript Clients for Apache Kafka For working with Apache Kafka in JavaScript environments, several clients and libraries can help you integrate Kafka into your JavaScript or Node.js applications. Here are some of the notable JavaScript clients for Apache Kafka from the past years: kafka-node: One of the original Node.js clients for Apache Kafka, kafka-node provides a straightforward and comprehensive API for interacting with Kafka clusters, including producing and consuming messages. node-rdkafka: This client is a high-performance library for Apache Kafka that wraps the native librdkafka library. It's known for its robustness and is suitable for heavy-duty operations. node-rdkafka offers advanced features and high throughput for both producing and consuming messages. KafkaJS: An Apache Kafka client for Node.js, which is entirely written in JavaScript, it focuses on simplicity and ease of use and supports the latest Kafka features. KafkaJS is designed to be lightweight and flexible, making it a good choice for applications that require a simple and efficient way to interact with a Kafka cluster. Challenges With Open Source Projects In General Open source projects are only successful if an active community maintains them. Therefore, familiar issues with open source projects include: Lack of documentation: Incomplete or outdated documentation can hinder new users and contributors. Complex contribution process: A complicated process for contributing can deter potential contributors. This is not just a disadvantage, as it guarantees code reviews and quality checks of new commits. Limited support: Relying on community support can lead to slow issue resolution times. Critical projects often require commercial support by a vendor. Project abandonment: Projects can become inactive if maintainers lose interest or lack time. Code quality and security: Ensuring high code quality and addressing security vulnerabilities can be challenging if nobody is responsible and has no critical SLAs in mind. Governance issues: Disagreements on project direction or decisions can lead to forks or conflicts. Issues With Kafka's JavaScript Open Source Clients Some of the above challenges apply for the available Kafka's open source JavaScript clients. We have seen maintenance inactivity and quality issues as the biggest challenges in projects. And be aware that it is difficult for maintainers to keep up not only with issues but also with new KIPs (Kafka Improvement Proposal). The Apache Kafka project is active and releases new features in new releases two to three times a year. kafka-node, KafkaJS, and node-rdkafka are all on different parts of the "unmaintained" spectrum. For example, kafka-node has not had a commit in 5 years. KafkaJS had an open call for maintainers around a year ago. Additionally, commercial support was not available for enterprises to get guaranteed response times and support help in case of production issues. Unfortunately, production issues happened regularly in critical deployments. For this reason, Confluent open-sourced a new JavaScript client for Apache Kafka with guaranteed maintenance and commercial support. Confluent's Open Source JavaScript Client for Kafka powered by librdkafka Confluent provides a Kafka client for JavaScript. This client works with Confluent Cloud (fully managed service) and Confluent Platform (self-managed deployments). But it is an open-source project and works with any Apache Kafka environment. The JavaScript client for Kafka comes with a long-term support and development strategy. The source code is available now on GitHub. The client is available via npm. npm (Node Package Manager) is the default package manager for Node.js. This JavaScript client is a librdkafka-based library (from node-rdkafka) with API compatibility for the very popular KafkaJS library. Users of KafkaJS can easily migrate their code over (details in the migration guide in the repo). At the time of writing in February 2024, the new Confluent JavaScript Kafka Client is in early access and not for production usage. GA is later in 2024. Please review the GitHub project, try it out, and share feedback and issues when you build new projects or migrate from other JavaScript clients. What About Stream Processing? Keep in mind that Kafka clients only provide a product and consume API. However, the real potential of event-driven architectures comes with stream processing. This is a computing paradigm that allows for the continuous ingestion, processing, and analysis of data streams in real-time. Event stream processing enables immediate responses to incoming data without the need to store and process it in batches. Stream processing frameworks like Kafka Streams or Apache Flink offer several key features that enable real-time data processing and analytics: State management: Stream processing systems can manage the state across data streams, allowing for complex event processing and aggregation over time. Windowing: They support processing data in windows, which can be based on time, data size, or other criteria, enabling temporal data analysis. Exactly-once processing: Advanced systems provide guarantees for exactly-once processing semantics, ensuring data is processed once and only once, even in the event of failures. Integration with external systems: They offer connectors for integrating with various data sources and sinks, including databases, message queues, and file systems. Event time processing: They can handle out-of-order data based on the time events actually occurred, not just when they are processed. Stream processing frameworks are NOT available for most programming languages, including JavaScript. Therefore, if you live in the JavaScript world, you have three options: Build all the stream processing capabilities by yourself. Trade-off: A lot of work! Leverage a stream processing framework in SQL (or another programming language). Trade-off: This is not JavaScript! Don't do stream processing and stay with APIs and databases. Trade-off: Cannot solve many innovative use cases. Apache Flink provides APIs for Java, Python, and ANSI SQL. SQL is an excellent option to complement JavaScript code. In a fully managed data streaming platform like Confluent Cloud, you can leverage serverless Flink SQL for stream processing and combine it with your JavaScript applications. One Programming Language Does NOT Solve All Problems JavaScript has broad adoption and sweet spots for client and server development. The new Kafka Client for JavaScript from Confluent is open source and has a long-term development strategy, including commercial support. Easy migration from KafkaJS makes the adoption very simple. If you can live with the dependency on librdkafka (which is acceptable for most situations), then this is the way to go for JavaScript Node.js development with Kafka producers and consumers. JavaScript is NOT an all-rounder. The data streaming ecosystem is broad, open, and flexible. Modern enterprise architectures leverage microservices or data mesh principles. You can choose the right technology for your application. Learn how to build data streaming applications using your favorite programming language and open-source Kafka client by looking at Confluent's developer examples: JavaScript/Node.js Java HTTP/REST C/C++/.NET Kafka Connect DataGen Go Spring Boot Python Clojure Groovy Kotlin Ruby Rust Scala Which JavaScript Kafka client do you use? What are your experiences? Or do you already develop most applications with stream processing using Kafka Streams or Apache Flink? Let’s connect on LinkedIn and discuss it!
Lately, I have been working with Polars and PySpark, which brings me back to the days when Spark fever was at its peak, and every data processing solution seemed to revolve around it. This prompts me to question: was it really necessary? Let’s delve into my experiences with various data processing technologies. Background During my final degree project on sentiment analysis, Pandas was just beginning to emerge as the primary tool for feature engineering. It was user-friendly and seamlessly integrated with several machine learning libraries, such as scikit-learn. Then, as I started working, Spark became a part of my daily routine. I used it for ETL processes in a nascent data lake to implement business logic, although I wondered if we were over-engineering the process. Typically, the data volumes we handled were not substantial enough to necessitate using Spark, yet it was employed every time new data entered the system. We would set up a cluster and proceed with processing using Spark. In only a few instances did I genuinely feel that Spark was not the right tool for the job. This experience pushed me to develop a lightweight ingestion framework using Pandas. However, this framework did not perform as expected, struggling with medium to large files. Recently, I've started using Polars for some tasks and I have been impressed by its performance in processing datasets with several million rows. This has led to me setting up a different benchmarking for all of these tools. Let's dive into it! A Little Bit of Context Pandas We don't have to forget that Pandas has been the dominant tool for data manipulation, exploration, and analysis. Pandas has risen in popularity among data scientists thanks to its similarities with the R grid view. Moreover, it is synchronized with other Python libraries related to the machine learning field: NumPy is a mathematical library for implementing linear algebra and standard calculations. Pandas is based on NumPy. Scikit-learn is the reference library for machine learning applications. Normally, all the data used for the model has been loaded, visualized, and analyzed with Pandas or NumPy. PySpark Spark is a free and distributed platform that transforms the paradigm of how big data processing is done, with PySpark as its Python library. It offers a unified computing engine with exceptional features: In-memory processing: Spark's major feature is its in-memory architecture, which is fast as it keeps the data in memory rather than on disk. Fault tolerance: The failure tolerance mechanisms that are built into the software ensure dependable data processing. Resilient Distributed Datasets do data tracking and allow for automatic recovery in case of failures such as failures. Scalability: Spark’s horizontally scalable architecture processes large datasets adaptively and distributes much faster to clusters. The data is distributed, using the massive power of all nodes in the cluster. Polars Polars is a Python library built on top of Rust, combining the flexibility and user-friendliness of Python with the speed and scalability of Rust. Rust is a low-level language that prioritizes performance, reliability, and productivity. It is memory efficient and gives performance par with C and C++. On the other hand, Polars uses Apache Arrow as its query engine to execute vectorized queries. Apache Arrow is a cross-language development platform for fast in-memory processing. Polars enable instantaneity in executing the operations of tabular data manipulation, analysis, and transformation, favoring its utilization with large datasets. Moreover, its syntax is like SQL, and the expressive complexity of data processing is easy to demonstrate. Another capability is its lazyness which evaluates queries and applies query optimization. Benchmarking Set up Here is a link to the GitHub project with all the information. There are four notebooks for each tool (two for polars for testing eager and lazy evaluation). The code will extract time execution for the following tasks: Reading Filtering Aggregations Joining Writing There are five datasets with multiple sizes, 50,000, 250,000, 1,000,000, 5,000,000, and 25,000,000 of rows. The idea is to test different scenarios and sizes. The data used for this test is a financial dataset from Kaggle. The tests were executed in: macOS Sonoma Apple M1 Pro 32 GB Table of Execution Times Row Size Pandas Polars Eager Polars Lazy PySpark 50,000 Rows 0.368 0.132 0.078 1.216 250,000 Rows 1.249 0.096 0.156 0.917 1,000,000 Rows 4.899 0.302 0.300 1.850 5,000,000 Rows 24.320 1.605 1.484 7.372 25,000,000 Rows 187.383 13.001 11.662 44.724 Analysis Pandas performed poorly, especially as dataset sizes increased. However, it could handle small datasets with decent performance time. PySpark, while being executed in a single machine, shows considerable improvement over Pandas when the dataset size grows. Polars, both in eager and lazy configurations, significantly outperforms the other tools, showing improvements up to 95-97% compared to Pandas and 70-75% compared to PySpark, confirming its efficiency in handling large datasets on a single machine. Visual Representations These visual aids help underline the relative efficiencies of the different tools across various test conditions. Conclusion The benchmarking results provided offer a clear insight into the performance scalability of four widely-used data processing tools across varying dataset sizes. From the analysis, several critical conclusions emerge: Pandas performance scalability: Popular for data manipulation in smaller datasets, it struggles significantly as the data volume increases indicating it is not the best for high-volume data. However, its integration over a lot of Machine Learning and stadistic libraries makes it indispensable for Data Science teams. Efficiency of Polars: Configurations of Polars (Eager and Lazy) demonstrate exceptional performance across all tested scales, outperforming both Pandas and PySpark by a wide margin, making Polars an efficient tool capable of processing large datasets. However, Polars has not released yet a major version of Python and until that, I don't recommend it for Production systems. Tool selection strategy: The findings underscore the importance of selecting the right tool based on the specific needs of the project and the available resources. For small to medium-sized datasets, Polars offers a significant performance advantage. For large-scale distributed processing, PySpark remains a robust option. Future considerations: As dataset sizes continue to grow and processing demands increase, the choice of data processing tools will become more critical. Tools like Polars built over Rust are emerging and the results have to be considered. Also, the tendency to use Spark as a solution for processing everything is disappearing and these tools are taking their place when there is no need for large-scale distributed systems. Use the right tool for the right job!
Kafka is a powerful streaming platform used for building real-time data streaming applications. When data is streamed into a Kafka broker, Kafka has the ability to provide metadata info about the message published into the Kafka topic. This metadata information can be retrieved using Kafka's inbuilt RecordMetadata class as an acknowledgment to build a guaranteed message delivery mechanism. This article will explore RecordMetadata, its attributes, and how it can be leveraged with various use cases. What Is RecordMetadata? Before delving into the details of the RecordMetadata class let's establish some details on key Kafka concepts. In Kafka, the Producer sends streams of data to the Kafka broker that receives, stores, and serves messages to consumers. The Kafka Consumer API allows applications to read streams of data from the cluster. When a producer or publisher sends a message or payload to a Kafka topic, the broker processes this message and returns a response back to the producer. This response includes a RecordMetadata that has been acknowledged by the server. RecordMetadata instance contains details such as the topic name, partition number, offset, timestamp, and more. RecordMetadata Attributes Topic: The name of the topic to which the message was delivered. Partition: The topic can be divided into partitions to handle data volume and this partition number within the topic informs where the message was stored. Offset: A unique identifier for the record within the partition helps to find the exact location of the message. Timestamp: The timestamp when a Kafka broker receives the message or payload. Serialized key and value size: The size of the serialized key and value of the message for efficient storage. Checksum: Brokers utilize it to make sure messages haven't been corrupted during storage or transmission. Use Cases for RecordMetadata The RecordMetadata class provides essential information that can be used in various scenarios. Here are some common use cases: 1. Monitoring and Logging In one of my past projects, we had to make sure that external data ingested in our ecosystem must go to a data lake routed through a Kafka broker for monitoring, audit, and reporting purposes. Initial deployment went really well but we started noticing that there were some glitches with overall Kafka availability and these issues were mainly associated with the underlying cloud provider network issue. We leveraged RecordMetadata to log and monitor the details of produced records. We captured metadata such as the topic, partition, and offset to keep track of the success of these messages flowing through the Kafka infrastructure. For successful scenario topics, partition and offset details were inserted into the database. Java ProducerRecord<String, String> record = new ProducerRecord<>("my-topic-name", "key", "value"); producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("Record delivered to topic " + metadata.topic() + " partition number: " + metadata.partition() + " with offset details: " + metadata.offset()); // Save above info in database } else { exception.printStackTrace(); } }); 2. Handle Error With a Retry Process As mentioned above, network issues and broker downtime can cause message production failures. By leveraging RecordMetadata, data producers can implement intelligent error handling and retry mechanisms to make sure we publish each and every message to fulfill audit and regulatory requirements. For instance, if a message fails to be produced, the producer can log the metadata and attempt to resend the message to the broker on the fly. If the issue persists a separate process can pick this message from the data store and retry at a later time based on the flag status saved in the database. Example: Java producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("Error publishing the message:: " + exception.getMessage()); //Implement retry logic here //Add a flag in the database if it is not successful after “Nth” retry. } }); 3. Performance Metrics Performance metrics can be generated using RecordMetadata attributes. Evaluating these attributes, developers can write a few lines of code to measure the latency and throughput of their Kafka message delivery operation. This information is important for optimizing performance metrics and adhering to SLAs. Preventive measures can be implemented for a high latency scenario to contain the issue locally and reduce the overall blast radius. Example: Java long initialTime = System.currentTimeMillis(); producer.send(record, (metadata, exception) -> { long timeToDeliverMessage = System.currentTimeMillis() - initialTime; if (exception == null) { System.out.println("Message delivered successfully in " + timeToDeliverMessage + " ms"); } else { // log errors exception.printStackTrace(); } }); Conclusion The RecordMetadata class gives information about responses from a Kafka broker that developers can utilize to implement monitoring features, error handling capability, and derive performance metrics using RecordMetadata attributes. Data producers can utilize these attributes to build more reliable and efficient guaranteed data streaming implementation to fulfill organizational needs. References Kafka Documentation Producer API: RecordMetaData Kafka Message Acknowledgement: A Deep Dive
Snowflake brings unbeatable uniqueness to the tech world: it is a cloud-based data warehousing solution that targets removing the nightmares associated with business data storage, management, and analytics. Essentially, its solution looks much like an all-in-one platform for data utilization in ways traditional setups could only have wished for. Before we go deep into Snowflake, let's first clarify what a data warehouse is. An extensive system stores and analyses big data sets from many sources. The main objective? It is a system that assures businesses can make decisions based on their data insights. Traditional data solutions are hardware-dependent, complex to deploy, and limited regarding scalability. On the other hand, cloud solutions with flexibility, scalability, and lower upfront costs are the features that bring significant shifting traits in technologies adapting to business needs — Snowflake. Core Components of Snowflake Database Storage Snowflake's unique architecture dynamically manages data, making it accessible and secure across multiple cloud platforms and for all data types. Intelligent architecture ensures low storage costs from very effective data compression and partitioning. Snowflake provides robust security capabilities, such as always-on encryption and finely tuned access controls for the highest data integrity and compliance levels. Query Snowflake's first-class engines, called 'Virtual Warehouses,' process queries on a single line and provide real-time data processing without lag. Independent compute clusters: These help scale and optimize performance. These clusters work independently, allowing users to scale up or down based on performance needs without affecting other operations. Snowflake also offers job prioritization, enabling the smooth running of critical queries while less important ones wait in line. Cloud Services Layer This layer, from storage to query execution, supports all core operations, ensuring seamless performance and security. Snowflake ensures best-in-class live data sharing between groups without moving the data, enabling maximum collaboration. The background processes are managed to ensure no impact on business operations. Data Management Within Snowflake Structured and Semi-Structured Data Loading and Transformation Processes Snowflake supports no manual effort while achieving more data loading and transformation accuracy through automation. Therefore, it should easily be capable of processing many data formats, including structured and semi-structured, such as JSON, without users having to be pointed to standalone data transformation tools. Example Workflow (e.g., Workflows for Data Integration) "I would never imagine being able to cope with such a problem. I would run." The verb "imagine" in this sentence doesn't seem appropriate for the context. Snowflake Architecture for Warehouse Scalability and Flexibility Explain vertical and horizontal scaling: Whether you need more computing power (vertical) or need to handle more operations simultaneously (horizontal), Snowflake scales smoothly. Adapt performance requirements to costs: This means scaling resources up or down with a few clicks, optimizing performance, and controlling costs more effectively. Elasticity: Snowflake automatically adapts to changes in workload without manual interference, consistently maintaining high performance even during unexpected surges in workload. Data Cloning and Time Travel Benefits of zero-copy cloning for developers: This capability enabled developers to clone databases or tables without adding storage costs, resulting in shorter testing and development timeframes. Explaining the details of data retrieval through time travel: Time Travel allows you to access and restore data from any given point within a configurable past window—critical for unexpected data recovery needs. Implementation of cloning and time travel features: Starting from basic error correction and historical analysis, these features provide all the crucial tools to manipulate and manage data effectively. Integration and Compatibility Integration With Other Services Connecting Snowflake with BI tools and ETL systems: Snowflake integrates with a host of BI and ETL third-party tools, making data workflows easy and improving overall productivity. API and driver support: Enjoy full API and driver support for popular programming languages to easily integrate Snowflake into your tech stack. Collaboration across diverse platforms and cloud providers: Thanks to Snowflake's enterprise-grade, cloud-agnostic framework, running solutions across Amazon AWS, Microsoft Azure, and Google Cloud is possible without compatibility issues. Supported Programming Languages Examples of Using Language-Specific Features Now, let's show examples of using some language-specific features. Custom libraries: Snowflake provides custom libraries for languages such as Java and many more, which makes the developer experience much more accessible. Optimization tips for Python, Java, and SQL: Data caching and batch querying can optimize performance and reduce latency. Additional optimization techniques include using compressed data formats and appropriate fetch sizes to ensure smooth and efficient data flow. Security and Compliance Build-in security features: Snowflake supports automatic encryption, network policies, and multi-factor authentication to secure data. International security compliance: Snowflake adheres to international standards in all its practices to meet the regulatory requirements for data handling, including GDPR. Best practices in data privacy and security: To enhance security, organizations should introduce best practices such as regular audits, role-based access control, and continuous monitoring. Practical Implementation and Use Cases Setting up your first Snowflake environment: A step-by-step guide —The initial setup process is very user-friendly, making it accessible even for beginners. This includes setting up user roles and implementing security measures. Configure initial settings and permissions: Access can be tailored for teams to configure settings and permissions, ensuring security measures are maintained. Tips for efficient data loading and querying: Follow these tips to ensure efficient data loading during designated hours without overloading the system while enabling efficient querying. Cost Management and Optimization Controlling costs in Snowflake: With Snowflake, you pay for what you use, enabling effective cost management and avoiding resource overcommitment. Built-in analytic tools can track and optimize usage patterns, ensuring cost-effective operations. What you can do with Snowflake: pricing —You can activate Virtual Warehouses with auto-suspend features and enhance data clustering to improve query performance efficiency. Brilliant. Analytical Insights and Business Intelligence How companies use Snowflake: Companies of all sizes, from small startups to large enterprises, use Snowflake for scalable analytics, which provides their staff with actionable insights. Analytic features to facilitate smart decisions: Snowflake offers features such as data sharing and secure views, fostering a data-driven culture by empowering teams with real-time insights. Inclusion of predictive analytics and machine learning: Snowflake supports predictive analytics and machine learning integration. For example, it seamlessly integrates with Spark, enabling the incorporation of AI and machine learning capabilities into your workflows for predictive analytics. Future Outlook and Enhancements Features and Development Roadmap Continuous innovation in the application will provide performance, security, and usability enhancements in future versions. Improvement usually involves more traits that may be compelled to change the modus operandi, significantly resulting in better productivity and safety. Community and Support The Snowflake community is exciting and active, allowing users to share ideas, solutions, and insights. The company will provide employees with all the resources. Snowflake empowers human documentation, tutorials, forums, and forums to let the users' learning curve and operational excellence continue. Snowflake Customization Snowflake is a flexible and powerful platform for developers to build robust custom applications meeting the required needs. End-to-end implementation and case studies of custom solutions built on the Snowflake platform. From custom data models to bespoke analytics solutions, developers use Snowflake's features to build uniquely tailored applications. Conclusion This guide studied Snowflake's architecture, which is powerful, flexible, and friendly. It starts with data storage and management, and Snowflake perfectly addresses business intelligence to rectify multiple business issues regarding data handling. Whether you're just starting or have a lot of data, Snowflake grows with you through our flexible, cost-effective solutions. Consider Snowflake a pivotal addition to your data strategy, with seamless elasticity, robust security features, and built-in support.
At a high level, bad data is data that doesn’t conform to what is expected. For example, an email address without the “@”, or a credit card expiry where the MM/YY format is swapped to YY/MM. “Bad” can also include malformed and corrupted data, such that it’s completely indecipherable and effectively garbage. In any case, bad data can cause serious issues and outages for all downstream data users, such as data analysts, scientists, engineers, and ML and AI practitioners. In this blog, we’ll take a look at how bad data may come to be, and how we can deal with it when it comes to event streams. Event Streams in Apache Kafka are predicated on an immutable log, where data, once written, cannot be edited or deleted (outside of expiry or compaction — more on this later). The benefit is that consumers can read the events independently, at their own pace, and not worry about data being modified after they have already read it. The downside is that it makes it trickier to deal with “bad data,” as we can’t simply reach in and edit it once it’s in there. In this post, we look at bad data in relation to event streams. How does bad data end up in an event stream? What can we do about it? What’s the impact on our downstream consumers, and how can we fix it? First, let’s take a look at the batch processing world, to see how they handle bad data and what we can learn from them. Bad Data in Batch Processing What is batch processing? Let’s quote Databricks, and go with: Batch Processing is a process of running repetitive, high volume data jobs in a group on an ad-hoc or scheduled basis. Simply put, it is the process of collecting, storing and transforming the data at regular intervals. Batch processing jobs typically rely on extracting data from a source, transforming it in some way, and then loading it into a database (ETL). Alternately, you can load it into the destination before you transform any of the data, in a recently trendy mode of operations known as ELT (by the way, data lake/warehouse people LOVE this pattern as they get all the $$$ from the transform). A friend and former colleague wrote more about ELTs and ETLs here, so take a look if you want to see another data expert’s evaluation. The gist, though, is that we get data from “out there” and bring it into “here”, our data lake or warehouse. In this figure, a periodic batch job kicks off, processes the data that lands in the landing table, and does something useful with it — like figure out how much money the company is owed (or how much your Datalake is costing you). Accomplishing this requires a reliable source of data — but whose job is it to ensure that the data powering the data lake is trustworthy and high quality? To cut to the chase, the data (or analytics) engineers in the data lake are responsible for getting data from across the company, pulling it in, and then sorting it out into a trustworthy and reliable format. They have little to no control over any of the changes made in production land. Data engineers typically engage in significant break-fix work keeping the lights on and the pipelines up and running. I would know, as I did this type of work for nearly 10 years. We’d typically apply schemas to the data once it lands in the data lake, meaning that changes to the source database table in production land may (likely) break the data sources in the data lake. Data engineers spend nights and weekends fixing malfunctioning data pipelines, broken just hours ago by the 5 pm database migration. Why are the data engineers responsible for applying schemas? The operational system owners have historically had no responsibility for data once it has crossed out of the source application boundary. Additionally, the data engineers taking the data out of the operational system are performing a “smash ‘n grab”, taking the data wholesale from the underlying database tables. It’s no surprise then that the operational team, with no responsibility for data modeling outside of their system, causes a breakage through a perfectly reasonable database change. The following figure shows how a broken ETL job (1) can cause bad data to show up in the Landing table (2), eventually resulting in (3) bad results. The exact reason for a broken ETL can vary, but let’s just say in this case it’s due to a change in types in the source database table (an int is now a string) that causes type-checking errors downstream. Once the data engineers spring from their beds and leap into action at 3 in the morning (when all data problems naturally occur), they can proceed to fix the ETL (4) by adding logic to handle the unexpected change. Next, they reprocess the failed batch to fix the landing table data (5), then rerun the job (6) that recomputes the results table (7) for the affected rows. For instance, say (7 — above) is a hive-declared table containing daily sales aggregate. I’m only going to delete and recompute the aggregates that I know (or think) are incorrect. I won’t drop the whole table and delete all the data if only a known subset is affected. I’ll just surgically remove the bad days (eg, April 19 to 21, 2024), reprocess (6 — above) for that time range, and then move on to reprocessing the affected downstream dependencies. Batch processing relies extensively on cutting out bad data and selectively replacing it with good data. You reach right into that great big data set, rip out whatever you deem as bad, and then fill in the gap with good data — via reprocessing or pulling it back out of the source. Bad Data Contaminated Data Sets The jobs that are downstream of this now-fixed data set must also be rerun, as their own results are also based on bad input data. The downstream data sets are contaminated, as are any jobs that run off of a contaminated data set. This is actually a pretty big problem in all of data engineering, and is why tools like dbt, and services like Databrick’s Delta Tables and Snowflake’s Dynamic Tables are useful — you can force recomputation of all dependent downstream jobs, including dumping bad data sets and rebuilding them from the source. But I digress. The important thing to note here is that once you get bad data into your data lake, it spreads quickly and easily and contaminates everything it touches. I’m not going to solve this problem here for you in this blog, however, but I do want you to be aware that it’s not always as simple as “cut out the bad, put in the good!” for fixing bad data sets. The reality is a lot messier, and that’s a whole other can of worms that I’m just going to acknowledge as existing and move on. Incremental Batch Processing One more word about processing data in batches. Many people have correctly figured out that it’s cheaper, faster, and easier to process your data in small increments, and named it incremental processing. An incremental processing job reads in new data and then applies it to its current state based on its business logic. For example, computing the most popular advertisements in 2024 would simply require a running tally of (advertisementId, clickCount), merging in the new events as they arrive. However, let’s say that you had bad data as input to your incremental job — say we’ve incorrectly parsed some of the click data and attributed them to the wrong advertisementId. To fix our downstream computations we’d have to issue unclick data, telling them “remove X clicks from these ads, then add X clicks to these other ads”. While it’s possible we could wire up some code to do that, the reality is we’re going to keep it simple: Stop everything, blow all the bad data away, rebuild it with good data, and then reprocess all the jobs that were affected by it. “Hold on”, you might say. “That’s nonsense! Why not just code in removals in your jobs? It can’t be that hard”. Well… kinda. For some jobs with no dependencies, you may be correct. But consider a moment if you have a process computing state beyond just simple addition and subtraction, as pretty much all businesses do. Let’s say you’re computing taxes owed for a corporation, and you’re dealing with dozens of different kinds of data sets. The logic for generating the final state of your corporate taxes is winding, treacherous, and not easily reversible. It can be very challenging and risky to code mechanisms to reverse every conceivable state, and the reality is that there will be cases that you simply don’t foresee and forget to code. Instead of trying to account for all possible reversal modes, just do what we do with our misbehaving internet routers. Just unplug it, wipe the state, and start it over. Heck, even dbt encourages this approach for misbehaving incremental jobs, calling it a full_refresh. Here are the important takeaways as we head into the streaming section. There is little prevention against bad data: The data engineering space has typically been very reactive. Import data of any and all quality now, and let those poor data engineers sort it out later. Enforced schemas, restrictions on production database migrations, and formalized data contracts between the operations and data plane are rarely used. The batch world relies on deleting bad data and reprocessing jobs: Data is only immutable until it causes problems, then it’s back to the drawing board to remutate it into a stable format. This is true regardless of incremental or full refresh work. I am often asked, “How do we fix bad data in our Kafka topic?” This is one of the big questions I myself asked as I got into event streaming, as I was used to piping unstructured data into a central location to fix up after the fact. I’ve definitely learned a lot of what not to do over the years, but the gist is that the strategies and techniques we use for batch-processed data at rest don’t transfer well to event streams. For these, we need a different set of strategies for addressing bad data. But before we get to those strategies, let’s briefly examine what happens to your business when you have bad data in your system. Beware the Side Effects of Processing Bad Data Bad data can lead to bad decisions, both by humans and by services. Regardless of batch processing or streaming, bad data can cause your business to make incorrect decisions. Some decisions are irreversible, but other decisions may not be. For one, reports and analytics built on bad data will disagree with those built on good data. Which one is wrong? While you’re busy trying to figure it out, your customer is losing confidence in your business and may choose to pull out completely from your partnership. While we may call these false reports a side effect, in effect, they can seriously affect the affectations of our customers. Alternatively, consider a system that tabulates vehicle loan payments, but incorrectly flags a customer as non-paying. Those burly men that go to repossess the vehicle don’t work for free, and once you figure out you’ve made a mistake, you’ll have to pay someone to go give it back to them. Any decision-making that relies on bad data, whether batch or streaming, can lead to incorrect decisions. The consequences can vary from negligible to catastrophic, and real costs will accrue regardless of if it’s possible to issue corrective action. You must understand there can be significant negative impacts from using bad data in stream processing, and only some results may be reversible. With all that being said, I won’t be able to go into all of the ways you can undo bad decisions made by using bad data. Why? Well, it’s primarily a business problem. What does your business do if it makes a bad decision with bad data? Apologize? Refund? Partial Refund? Take the item back? Cancel a booking? Make a new booking? You’re just going to have to figure out what your business requirements are for fixing bad data. Then, you can worry about the technology to optimize it. But let’s just get to it and look at the best strategies for mitigating and dealing with bad data in event streams. I’m Streaming My Life Away Event streams are immutable (aside from compaction and expiry). We can’t simply excise the bad data and inject corrected data into the space it used to occupy. So what else can we do? The most successful strategies for mitigating and fixing bad data in streams include, in order: Prevention: Prevent bad data from entering the stream in the first place: Schemas, testing, and validation rules. Fail fast and gracefully when data is incorrect. Event design: Use event designs that let you issue corrections, overwriting previous bad data. Rewind, rebuild, and retry: When all else fails. In this blog, we’re going to look primarily at prevention, covering the remaining strategies in a follow-up post. But to properly discuss these solutions, we need to explore what kind of bad we’re dealing with and where it comes from. So let’s take a quick side trip into the main types of bad data you can expect to see in an event stream. The Main Types of Bad Data in Event Streams As we go through the types, you may notice a recurring reason for how bad data can get into your event stream. We’ll revisit that at the end of this section. 1. Corrupted Data The data is simply indecipherable. It’s garbage. It turned into a sequence of bytes with no possible way to retrieve the original data. Data corruption is relatively rare but may be caused by faulty serializers that convert data objects into a plain array of bytes for Kafka. Luckily, you can test for that. 2. Event Has No Schema Someone has decided to send events with no schema. How do you know what’s “good data” and what’s “bad data”, if there are no structure, types, names, requirements, or limitations? 3. Event Has an Invalid Schema Your event’s purported schema can’t be applied to the data. For example, you’re using the Confluent Schema Registry with Kafka, but your event’s Schema Id doesn’t correspond to a valid schema. It is possible you deleted your schema, or that your serializer has inserted the wrong Schema Id (perhaps for a different schema registry, in a staging or testing environment?). 4. Incompatible Schema Evolution You’re using a schema (hooray!), but the consumer cannot convert the schema into a suitable format. The event is deserializable, but not mappable to the schema that the consumer expects. This is usually because your source has undergone breaking schema evolution (note that evolution rules vary per schema type), but your consumers have not been updated to account for it. 5. Logically Invalid Value in a Field Your event has a field with a value that should never be. For example, an array of integers for “first_name”, or a null in a field declared as a NPE (see below). This error type arises when you are not using a well-defined schema, but simply a set of implicit conventions. It can also arise if you are using an invalid, incomplete, old, or homemade library for serialization that ignores parts of your serialization protocol. 6. Logically Valid but Semantically Incorrect These types of errors are a bit trickier to catch. For example, you may have a serializable string for a “first_name” field (good!), but the name is “Robert’); DROP TABLE Students; — ”. While little Bobby Tables here is a logically valid answer for a first_name field, it is highly unlikely improbable that this is another one of Elon Musk’s kids. The data in the entry may even be downright damaging. The following shows an event with a negative “cost”. What is the consumer supposed to do with an order where the cost is negative? This could be a case of a simple bug that slipped through into production, or something more serious. But since it doesn’t meet expectations, it’s bad data. Some event producer systems are more prone to these types of errors. For example, a service that parses and converts NGINX server logs or customer-submitted YAML/XML files of product inventory into individual events. Malformed sources may be partially responsible for these types of errors. 7. Missing Events This one is pretty easy. No data was produced, but there should have been something. Right? The nice thing about this type of bad data is that it’s fairly easy to prevent via testing. However, it can have quite an impact if only some of the data is missing, making it harder to detect. More on this in a bit. 8. Events That Should Not Have Been Produced There is no undo button to call back an event once it is published to an event stream. We can fence out one source of duplicates with idempotent production, meaning that intermittent failures and producer retries won’t accidentally create duplicates. However, we cannot fence out duplicates that are logically indistinguishable from other events. These types of bad events are typically created due to bugs in your producer code. For example, you may have a producer that creates a duplicate of: An event that indicates a change or delta (“add 1 to sum”), such that an aggregation of the data leads to an incorrect value. An analytics event, such as tracking which advertisements a user clicked on. This will also lead to an overinflation of engagement values. An e-commerce order with its own unique order_id (see below). It may cause a duplicate order to be shipped (and billed) to a customer. While there are likely more types of bad data in event streams that I may have missed, this should give you a good idea of the types of problems we typically run into. Now let’s look at how we can solve these types of bad data, starting with our first strategy: Prevention. Preventing Bad Data With Schemas, Validation, and Tests Preventing the entry of bad data into your system is the number one approach to making your life better. Diet and exercise are great, but there’s no better feeling than watching well-structured data seamlessly propagate through your systems. First and foremost are schemas. Confluent Schema Registry supports Avro, Protobuf, and JSON Schema. Choose one and use it (I prefer Avro and Protobuf myself). Do yourself, your colleagues, and your future selves a favor. It’s the best investment you’ll ever make. There are also other schema registries available, though I personally have primarily used the Confluent one over the years (and also, I work at Confluent). But the gist is the same — make it easy to create, test, validate, and evolve your event schemas. Preventing Bad Data Types 1–5 With Schemas and Schema Evolution Schemas significantly reduce your error incident rates by preventing your producers from writing bad data, making it far easier for your consumers to focus on using the data instead of making best-effort attempts to parse its meaning. Schemas form a big part of preventing bad data, and it’s far, far, far easier to simply prevent bad data from getting into your streams than it is to try to fix it after the damage has already started. JSON is a lightweight data-interchange format. It is a common yet poor choice for events, but it doesn’t enforce types, mandatory and optional fields, default values, or schema evolution. While JSON has its uses, it’s not for event-driven architectures. Use an explicitly-defined schema such as Avro, Protobuf, or JSON Schema. Going schemaless (aka using JSON) is like going around naked in public. Sure, you’re “free” of the constraints, boundaries, and limitations, but at what expense? Everyone else has to figure out what the hell is going on, and chaos (and the police) will follow. But reeling back in the hyperbole, the reality is that your consumers need well-defined data. If you send data with a weak or loose schema, it just puts the onus on the consumer to try to figure out what you actually mean. Let’s say we have 2 topics with no schemas and 4 consumers consuming them. So many chances to screw up the data interpretation! There are 8 possible chances that a consumer will misinterpret the data from an event stream. And the more consumers and topics you have, the greater the chance they misinterpret data compared to their peers. Not only will your consumers get loud, world-stopping exceptions, but they may also get silent errors — miscalculating sums and misattributing results, leading to undetected divergence of consumer results. These discrepancies regularly pop up in data engineering, such as when one team’s engagement report doesn’t match the other team’s billing report due to divergent interpretations of unstructured data. It’s worth contrasting this multi-topic, multi-consumer approach with the typical ETL/ELT pipeline into the data plane. In this streaming model, we’re not differentiating who uses the data for what purposes. A consumer is a consumer. In contrast, with ETLs, we’re typically moving data into one major destination, the data lake (or warehouse), so it’s a lot easier to apply a schema to the data after it lands but before any dependent jobs consume it. With streaming, once it’s in the stream, it’s locked in. Implicit schemas, historical conventions, and tribal knowledge are unsuitable for providing data integrity. Use a schema, make it strict, and reduce your consumer's exposure to unintentional data issues. Once adopted, you can rely on your CI/CD pipelines to perform schema, data, and evolution validation before deploying. The result? No more spewing bad data into your production streams. Data Quality Rules: Handling Type 6: (Logically Valid but Semantically Incorrect) While many of the “bad data” problems can be avoided by using schemas, they are only a partial solution for this type. Sure, we can enforce the correct type (so no more storing Strings in Integer fields), but we can’t guarantee the specific semantics of the data. So what can we do, aside from using a schema? Producer unit tests. Throw Exceptions if malformed (eg: If the phone number is longer than X digits) Rely on Data Contracts and Data Quality Rules (note: JSON Schema also has some built-in data quality rules) Here’s an example of a Confluent data quality rule for a US Social Security Number (SSN). JSON { "schema": "…", "ruleSet": { "domainRules": [ { "name": "checkSsnLen", "kind": "CONDITION", "type": "CEL", "mode": "WRITE", "expr": "size(message.ssn) == 9" } ] } } This rule enforces an exact length of 9 characters for the SSN. If it’s an Integer, we could also enforce that it must be positive, and if it is a string it must only contain numeric characters. The data quality checks are applied when the producer attempts to serialize data into a Kafka record. If the message.ssn field is not exactly 9 characters in length, then the serializer will throw an exception. Alternatively, you can also send the record to a dead-letter queue (DLQ) upon failure. Approach DLQ usage with caution. Simply shunting the data into a side stream means that you’ll still have to deal with it later, typically by repairing it and resending it. DLQs work best where each event is completely independent with no relation to any other event in the stream and ordering is not important. Otherwise, you run the risk of presenting an error-free yet incomplete stream of data, which can also lead to its own set of miscalculations and errors! Don’t get me wrong. DLQs are a good choice in many scenarios, but they should truly be a last-ditch effort at preventing bad data from getting into a stream. Try to ensure that you test, trial, and foolproof your producer logic to publish your record to Kafka correctly the first time. Testing — Handling Types 7 (Missing Data) and 8 (Data That Shouldn’t Have Been Produced) Third in our trio of prevention heroes is testing. Write unit and integration tests that exercise your serializers and deserializes, including schema formats (validate against your production schema registry), data validation rules, and the business logic that powers your applications. Integrate producer testing with your CI/CD pipeline so that your applications go through a rigorous evaluation before they’re deployed to production. Both Type 7: Missing Data and Type 8: Data that shouldn’t have been produced are actually pretty easy to test against. One of the beautiful things about event-driven systems is that it’s so easy to test them. For integration purposes, you simply produce events on the inputs, wait to see what comes out of the outputs, and evaluate accordingly. Once you find the bug in your logic, write another test to ensure that you don’t get a regression. Summary Bad data can creep into your data sets in a variety of ways. Data at rest consists of partitioned files typically backed by a Parquet or ORC format. The data is both created and read by periodically executed batch processes. The files are mutable, which means that bad data can be fixed in place and overwritten, or it can be deleted and regenerated by the upstream batch job. Kafka topics, in contrast, are immutable. Once a bad event is written into the event stream, it cannot be surgically removed, altered, or overwritten. Immutability is not a bug but a feature — every consumer gets the same auditable data. But this feature requires you to be careful and deliberate about creating your data. Good data practices prevent you from getting into trouble in the first place. Write tests, use schemas, use data contracts, and follow schema evolution rules. After your initial investment, you and your colleagues will save so much time, effort, and break-fix work that you’ll actually have time to do some of the fun stuff with data — like getting actual work done. Prevention is the single most effective strategy for dealing with bad data. Much like most things in life, an ounce of prevention is worth a pound of cure (or 28.3g of prevention and 454g of cure for those of us on the metric system). In the next post, we’ll take a look at leveraging event design as part of handling bad data in event streams. There are many ways you can design your events, and we’ll look at a few of the most popular ways and their tradeoffs. Don’t touch that dial, we’ll be right back (yes, TVs used to have dials, and needed time to warm up).
Real-time data is no longer a nice-to-have, but a must-have when creating relevant and engaging user experiences. Most industries today have grown accustomed to consuming instant updates, so if you’re a front-end developer looking to break into real-time app development, you’ll need to master the flow of real-time data. As a developer advocate at Redpanda, my job is to help enable developers to leverage streaming data in their applications. Part of that involves introducing developers to better technologies and showcasing them in practical and fun use cases. So, in this post, I’ll demonstrate how I used three modern technologies — Redpanda Serverless, Pusher, and Vercel — to create a compelling real-time frontend application, which I hope will spark your own ideas for how you can implement this powerful trio in your world. The Cupcake Conundrum Imagine a bustling cupcake business in NYC. To reel in customers in such a competitive market, they’ll need to make their location readily visible to nearby cupcake fans. They’ll also need to engage their customers via immediate feedback to build trust, enhance the overall user experience, and drive repeat business and customer loyalty. However, developing real-time applications has been traditionally difficult as they are designed to respond to user inputs rather than continuously listen for and process incoming data streams. The latter requires a robust and complex infrastructure to manage persistent connections and handle high volumes of data with minimal latency. For the visual learners, here’s a quick video explaining the use case: Selecting the Right Technologies I chose Redpanda Serverless as the streaming data platform since traditional streaming data solutions, like Apache Kafka®, can be complex and resource-intensive, making it a massive hurdle for teams with limited time and resources. Some considerations when running the platform: Eliminates infrastructure overhead: It manages the backbone of streaming data, allowing me to focus on application logic. Simplifies scalability: Effortlessly scales with my application's needs, accommodating spikes in data without manual intervention. Reduces time to market: With a setup time of seconds, it speeds up development for quicker iterations and feedback. Pay as you grow: It adapts to my usage, ensuring costs align with my actual data processing needs, which is ideal for startups and small projects. This takes care of the complex infrastructure for dealing with high volumes of data and the low latency that’s expected of real-time applications. Now, I need to establish a single, long-lived connection between the browser and the server, typically done through WebSocket, which sets up a full-duplex communication channel over an HTTP connection. This allows the server to push updates to the browser client without needing periodic requests. However, Vercel doesn't support WebSocket, so I needed an alternative solution. Here's where Pusher pops up. Pusher lets me create real-time channels between the server and client, simplifying the complexity associated with directly using WebSocket. When deploying real-time frontend applications, Vercel stands out for its seamless Git repository integration that makes deployments easy. With a push of a button, changes are automatically updated and I can get website statistics and data from other solutions (like databases) when needed. Preparing the Application In my application, mapview.js acts as a Vercel serverless function, which plays the most important role by consuming data from the topic I created in Redpanda Serverless and then updating the inventory status. Before using Pusher to relay these updates to the front end, Serverless maps the store IDs in store_nyc.csv to their physical locations and then add the location information (latitude and longitude) that the client needs to render. JavaScript await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const messageData = JSON.parse(message.value.toString()); const location = storeLocations[messageData.store]; const { store, ...rest } = messageData; for (let store in inventory) { inventory[store].latest = false; } inventory[messageData.store] = { ...rest, ...location, latest: true }; try { pusher.trigger("my-channel",channelId, JSON.stringify(inventory)); } catch (error) { console.error('Error:', error); } }, }) Note: Vercel serverless functions have a maximum duration limit, which varies depending on your subscription plan. So, I set the MAX_BLOCK_TIME to five seconds. The Pro plan allows up to 300 seconds of execution for a better user experience. JavaScript await new Promise(resolve => setTimeout(resolve, MAX_BLOCK_TIME) ); On the front end, index.html presents the real-time map using the LeafletJS libraries and inventory updates, giving the end users a dynamic and interactive experience. JavaScript channel.bind('cupcake-inv', function(data) { var inventory = data; tableBody.innerHTML = ''; for (var store in inventory) { var storeData = inventory[store]; if (markers[store]) { markers[store].setLatLng([storeData.lat, storeData.lng]) .setPopupContent(`<b>${storeData.store}</b><br>Blueberry: ${storeData.blueberry}<br>Strawberry: ${storeData.strawberry}`); } else { markers[store] = L.marker([storeData.lat, storeData.lng]).addTo(map) .bindPopup(`<b>${storeData.store}</b><br>Blueberry: ${storeData.blueberry}<br>Strawberry: ${storeData.strawberry}`); } It also generates a unique session ID per session to create channels in Pusher, so each session will have its unique channel to receive updates. JavaScript channel.bind(uniqueChannelId, function(data) { var inventory = data; for (var store in inventory) { var storeData = inventory[store]; …… document.addEventListener('DOMContentLoaded', () => { fetch(`/api/mapview?channelId=${encodeURIComponent(uniqueChannelId)}`) The Recipe: Real-Time Cupcake Updates With Redpanda Serverless, Vercel, and Pusher It’s time to start cooking! Here's a step-by-step breakdown of how I brought this vision to life, which you can follow. If you want to skip ahead, you can find all the code in this GitHub repository. Step 1: Set up Redpanda Serverless Sign up and create the cluster: After signing up, click the Create Cluster button and select a region close to your workload, ensuring low latency for your data. Create the user and set permissions: Under the Security tab, create a new user and set the necessary permissions. Create the topic: Create a topic called inv-count that’s dedicated to tracking cupcake stock updates. Step 2: Integrate Pusher for Real-Time Updates Register the application: After creating an app within Pusher, copy the application credentials, including the app_id, key, secret, and cluster information, and store them for use in your application. Step 3: Deploy With Vercel Integrate with GitHub: Push the updated codebase to a GitHub repository, ensuring your changes are version-controlled and ready for deployment. Import and set up the project in Vercel: Navigate to Vercel and import the project by selecting the “cupcakefanatic” repository. Specify cupcake-pusher as the root directory for the deployment. Configure the environment: Enter the project-specific environment variables. With that, I can establish a seamless real-time connection between the server and clients, enhancing the store’s online presence and user engagement — without the heavy lifting traditionally associated with real-time streaming data. Below is a screenshot of the resulting real-time data in our cupcake app. With the winning combination of Redpanda Serverless, Pusher, and Vercel, I easily created a dynamic, responsive application that keeps customers informed and engaged with live inventory updates. If you have questions, ask me in the Redpanda Community on Slack, I am there most of the time :)
Miguel Garcia
Sr Engineering Director,
Factorial
Gautam Goswami
Founder,
DataView