Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
Distributed Caching: Enhancing Performance in Modern Applications
Do We Need Data Normalization Anymore?
The batch pipeline, in my opinion, is not going away. The system still needs it for business intelligence applications and data visualization. These reports often involve processing significant historical data. While iterative processing could be considered, it’s likely that with advancements in processing power and machine learning algorithms, there will always be a need to rebuild the data model from scratch. This approach will ensure the agility of the data layer and help solve the challenges we covered in our previous post. Difficulty Accessing Data Data engineers can retrieve data in real-time by accessing the streaming network and the operational database. Having a more stateless pipeline that caters to special data shape demands and can process data as it comes requires a less complex pipeline. Masking sensitive data before distributing it. Noisy and Dirty Data Validating data for its shape and context on the fly avoids future problems. Checks for duplication and mismatches can be distributed to be processed before saving it to the data warehouse. This prevents unclean data. Performance A high throughput, reliable streaming platform capable of quickly retrieving historical events avoids pipeline congestion and process bottlenecks. Scaling with stateless pipelines and distributing jobs in stateful pipelines can horizontally scale out. Troubleshooting With data modeled and available from different streams, it’s easier to detect problems by monitoring and sampling them with alerts. Simpler to isolate problematic data relating to pipelines consuming from the same streaming endpoint. Pre-cleaned data before it enters the data warehouse. Summary When it comes to generating datasets for training machine learning models, streaming data is better suited for continuous training and testing. However, it can be challenging to prepare datasets for ML model training from different types of data stores that were introduced throughout the years. Implementing a proven data strategy can streamline your troublesome data pipelines for real-time data ingestion, efficient processing, and seamless integration of disparate systems. Next, I’ll walk you through a use case where we implement these data strategies to leverage generative AI for an insurance claim. We’ll also use Redpanda as our streaming data platform—a simpler, more performant, and cost-effective Kafka alternative. This use case will be nicely packaged in a free, downloadable report (coming soon!), so make sure you subscribe to our newsletter to be the first to know. In the meantime, if you have questions about this topic or need support getting started with Redpanda for your own AI use case, you can chat with me in the Redpanda Community on Slack.
Knowledge graphs are a giant web of information where elements and ideas are linked to show how they are related in the real world. This is beyond databases that just store information. Knowledge graphs also store the connections between information. This makes knowledge graphs very useful in various fields. Here are a few examples: Search engines: Search engines use knowledge graphs to understand the relationships between search terms and real-world entities. A search for "French food" might not just surface recipes, but also information about French wine regions or famous French chefs, thanks to the connections embodied in the knowledge graph. Virtual assistants: Virtual assistants like Siri or Alexa rely on knowledge graphs to understand your requests and provide helpful responses. By knowing that "Eiffel Tower" is a landmark and "Paris" is a city, the assistant can answer your question about the Eiffel Tower's location. Machine learning applications: Machine learning algorithms can leverage knowledge graphs to improve their understanding of the world. A recommendation system, for example, can use a knowledge graph to connect movies with actors, directors, and genres. This allows to recommend similar movies based on past preferences. Large Language Models (LLMs): LLMs can benefit from knowledge graphs by accessing and processing all the information and connections that they have stored. This helps LLMs to generate more comprehensive and informative responses to our questions. Fraud detection: Knowledge graphs can be used to identify fraudulent activity by analyzing connections between entities. For example, a graph might flag a transaction as suspicious if it involves a new account linked to a known fraudulent IP address. Knowledge Graph Basics In a library, books may not just be shelved by category, but also cross-referenced. A book on Paris might be near French history books, but also connected to travel guides and works by Parisian authors. This web of connections is the essence of a knowledge graph. The basic building blocks of a knowledge graph contain: Nodes: These are the fundamental entities in the graph. They can be anything you can describe: physical objects (like the Eiffel Tower), abstract concepts (like democracy), events (like the French Revolution), or even people (like Marie Curie). Edges: These are the connections between nodes. They show how entities relate to each other. Edges are often labeled to specify the nature of the connection. Going back to our Paris example, the edge between "Paris" and "France" might have the label "capital of." Other labels could be "inhabitant of" (between Paris and Marie Curie) or "influenced by" (between French Revolution and democracy). Labels: These are crucial for understanding the edges. They provide context and meaning to the connections between nodes. Properties: Nodes and edges can have properties, which are additional attributes or metadata associated with them. For example, a person node might have properties such as "name," "age," "gender," etc., while an edge representing the relationship "is married to" might have properties like "start date" and "end date." Ontologies: These are blueprints for the knowledge graph. They define the types of entities allowed in the graph, the possible relationships between them, and the labels used for those relationships. In a library, again, there can be a specific classification system for books, defining sections, subsections, and how different categories of books can relate. An ontology sets the rules for how information is organised within the knowledge graph. Schema: Based on the ontology, a schema defines the types of entities, relationships, and properties allowed in the graph. It provides structure and consistency to the data, making it easier to query and analyze. Superpowers of a Knowledge Graph This web of relationships unlocks a unique power: machines can reason and infer new information based on what they "know" in the graph. Here are two examples below. Reasoning and Inference: The "Aha Moment" for Machines Assume a knowledge graph that stores information like "Paris is the capital of France" and "France is in Europe." While the graph might not explicitly state "Paris is in Europe," the connections between these entities allow a machine to reason towards to that conclusion. This "aha moment" is the essence of reasoning with knowledge graphs. Machines can analyze these connections and infer new information that isn't explicitly stated, expanding their understanding of the world. Example A travel recommendation system uses a knowledge graph to connect cities with tourist attractions and nearby landmarks. If a user expresses interest in visiting the Eiffel Tower, the system can reason using the knowledge graph and recommend exploring Paris, even if the user didn't specifically mention the city. Interoperability: Sharing Knowledge Like a Universal Library Knowledge graphs aren't isolated islands of information. They can be built using standardized formats, allowing different systems to understand and exchange information stored within their graphs, like a universal filing system for libraries. Each library can curate its own collection (specific knowledge graph), but they can all leverage the information from other libraries because they follow the same organization principles (standardized formats). Example A product recommendation engine in an online store uses a knowledge graph. This graph might connect products with their features, brands, and similar items. The store could then share this knowledge graph with a partner company that provides product reviews. The review company, with its own knowledge graph for user sentiment analysis, could then analyze reviews in the context of the product information from the store's knowledge graph. This can lead to more insightful recommendations for customers. A Sample of Important Use Cases Knowledge graphs may provide a powerful framework for systematically generating test cases. This can be done by leveraging the structured representation of software components, their interactions, and domain-specific knowledge. By analyzing the graph, testers can identify critical paths, handle complexity, incorporate constraints, and automate the generation process, improving the quality and coverage of the testing effort. Let's explore some important use cases. Modeling Software Components and Interactions Knowledge graphs can represent components of a software system, such as modules, classes, functions, or APIs, as nodes in the graph. Edges between these nodes may represent the interactions or dependencies between the components. By analyzing these interactions, testers can identify potential test scenarios and paths through the system. Incorporating Domain Knowledge Knowledge graphs can integrate domain-specific knowledge, such as industry standards, best practices, or regulatory requirements, into the test case generation process. By incorporating domain-specific nodes and edges into the graph, testers can ensure that test cases align with domain-specific considerations and constraints. Versioning and Change Management Knowledge graphs can also support versioning and change management by tracking the history of requirements and test cases over time. Testers can view the evolution of requirements and their associated test cases, including when changes were made and by whom. This historical context is valuable for understanding the rationale behind changes and ensuring traceability across different iterations of the software. Cross-Referencing Dependencies Requirements often have dependencies on each other, and test cases may also have dependencies on multiple requirements. Knowledge graphs can capture these dependencies as edges between nodes, enabling testers to visualize and understand the interconnectedness of requirements and test cases. This can help in identifying potential conflicts or gaps in the testing coverage. Identifying Patterns and Trends Knowledge graphs may enable testers to identify patterns and trends in defect occurrences, such as recurring issues, common failure scenarios, or correlations between specific code changes and defects. By analyzing the graph, testers can gain insights into the root causes of defects and prioritize their investigation efforts accordingly. OpenSource Knowledge Graphs Some open-source knowledge graphs offer a glimpse into how these systems are structured and function. Examples include: Wikidata: A collaborative, editable knowledge base operated by the Wikimedia Foundation DBpedia: A knowledge graph extracted from Wikipedia YAGO: A knowledge graph from Wikipedia for web search KBpedia: KBpedia is an open-source knowledge graph that integrates seven leading public knowledge bases, including Wikipedia, Wikidata, schema.org, DBpedia, GeoNames, OpenCyc, and standard UNSPSC products and services. It provides a comprehensive structure for promoting data interoperability and knowledge-based artificial intelligence (KBAI). KBpedia’s upper ontology (KKO) includes more than 58,000 reference concepts, mapped linkages to about 40 million entities (mostly from Wikidata), and 5,000 relations and properties. It’s a flexible and computable knowledge graph suitable for various machine learning tasks. Logseq: A knowledge graph tool that combines note-taking, outlining, and wiki functionality; It allows users to create interconnected notes and organize information in a graph-like structure. Athens: A knowledge graph tool that integrates with other note-taking apps like Roam Research; It allows users to create linked notes and build a network of ideas. GraphGPT: While not a standalone knowledge graph, GraphGPT is a language model fine-tuned for generating graph-based responses. It can be used to create educational content related to knowledge graphs. GitJournal: A knowledge graph tool that integrates with Git repositories; It allows users to create and manage notes using Git version control. RecBole: A recommendation library that leverages knowledge graphs for personalized recommendations; It can be useful for educational scenarios related to recommendation systems. DeepKE: A toolkit for knowledge embedding that can be used to embed entities and relations from knowledge graphs into vector representations; It’s helpful for educational purposes related to graph-based machine learning. These resources provide a valuable learning ground for understanding the fundamentals of knowledge graphs and their potential applications. Knowledge Graphs in the Industry There are multiple cases in the industry where companies benefit from knowledge graphs. The tech giant Google utilizes knowledge graphs extensively. Their knowledge graph powers search results by understanding the relationships between entities, providing more relevant information to users. Amazon leverages knowledge graphs to enhance its recommendation systems. By analyzing user behavior and product attributes, they create personalized recommendations for customers. Walmart uses knowledge graphs to optimize supply chain management. By modeling relationships between products, suppliers, and logistics, they improve inventory management and distribution. The ride-sharing company Lyft, employs knowledge graphs to enhance route optimization and improve driver-passenger matching. By understanding geographical relationships, they optimize travel times and reduce wait times. Airbnb’s knowledge graph helps match hosts and guests based on preferences, location, and availability. It enhances the user experience by suggesting relevant listings. Let's dive into the details of two specific cases: Allianz and eBay. Allianz: Streamlining Regression Testing with Knowledge Graphs German insurance giant Allianz implemented a knowledge graph system to streamline regression testing for their core insurance platform. Here's how it worked: Knowledge Graph Construction Allianz built a knowledge graph that captured information about the insurance platform's functionalities, user roles, data entities (policies, claims, customers), and the relationships between them. Test Case Automation The knowledge graph was leveraged to automate the generation of basic regression test cases. The rich network of information within the graph allowed the system to identify different testing scenarios and create corresponding test cases. This significantly reduced the manual effort required for regression testing. Improved Test Maintenance The knowledge graph's ability to represent changes in the system proved valuable. When updates were made to the insurance platform, the knowledge graph was easily updated to reflect these changes. This ensured that the automatically generated regression tests remained relevant and continued to cover the latest functionalities. The results for Allianz were positive. They reported a significant reduction in regression testing time and a corresponding increase in test coverage. The knowledge graph also simplified test maintenance, allowing testers to focus on more complex scenarios. eBay: Enhancing Test Case Design With Knowledge Graphs E-commerce giant eBay experimented with knowledge graphs to improve the design and management of test cases for their marketplace platform. Here's a breakdown of their approach: Mapping User Journeys eBay used a knowledge graph to model user journeys on the platform. This included entities like buyers, sellers, products, search functionalities, and checkout processes. Relationships between these entities were carefully mapped, providing a holistic view of user interactions. Identifying Test Coverage Gaps By visualizing user journeys within the knowledge graph, eBay could easily identify areas where existing test cases were lacking. For example, the graph might reveal that there were no tests for a specific type of user interaction or a particular edge case scenario. Optimizing Test Suite Design With these gaps identified, eBay could then design new test cases to ensure comprehensive coverage of user journeys. The knowledge graph facilitated a more systematic approach to test case design, ensuring functionalities were thoroughly tested. While specific details about the outcomes are limited, eBay's experiment demonstrates the potential of knowledge graphs to improve the efficiency and effectiveness of test case design for complex software systems. Technological Challenges There are open issues in building and maintaining these powerful tools. From gathering and cleaning vast amounts of data to ensuring the knowledge graph stays up-to-date, there are significant challenges to overcome. Let's explore a sample of challenges in detail. 1. Data Acquisition and Cleaning Knowledge Gathering Building a comprehensive knowledge graph requires gathering information from diverse sources. This can be a time-consuming and resource-intensive task, especially for complex domains. Data Quality The accuracy and consistency of information feeding into the knowledge graph are crucial. Cleaning and filtering data to eliminate errors, inconsistencies, and duplicates can be a significant challenge. 2. Knowledge Graph Construction and Maintenance Schema Design Defining the structure of the knowledge graph, including the types of entities, relationships, and properties, requires careful planning. This schema should be flexible enough to accommodate new information while maintaining consistency. Knowledge Graph Population Populating the graph with accurate and up-to-date information can be an ongoing process. As the world changes, the knowledge graph needs to be updated to reflect these changes. 3. Integration and Interoperability Data Integration Knowledge graphs often need to integrate information from various sources, which can have different formats and structures. Reconciling these differences and ensuring seamless data flow can be challenging. Interoperability For knowledge graphs to truly unlock their potential, they need to be able to communicate and exchange information with other knowledge graphs. Standardized formats and protocols are needed to facilitate this interoperability. 4. Reasoning and Inference Reasoning Capabilities While knowledge graphs have the potential to reason and infer new information based on existing connections, developing robust reasoning algorithms is an ongoing area of research. Explainability When a knowledge graph makes an inference, it's crucial to understand the reasoning behind it. Ensuring transparency and explainability in the reasoning process is important for building trust in the system. 5. Scalability and Performance Large Knowledge Graphs As knowledge graphs grow in size and complexity, managing their storage, processing, and querying can become challenging. Scalable solutions are needed to handle massive amounts of information efficiently. Query Performance Ensuring fast and efficient retrieval of information from the knowledge graph is essential for real-world applications. Optimizing query processing techniques is an ongoing challenge. Wrapping Up Knowledge graphs represent a paradigm shift in software engineering and testing. By moving beyond traditional test case management approaches, knowledge graphs offer a more holistic and interconnected view of the software system. This structured representation of information unlocks possibilities for automation, optimization, and more robust and efficient software development lifecycles. As the technology matures and the challenges are addressed, knowledge graphs are a promising candidate to become a cornerstone of modern software engineering practices.
In today's cloud computing world, all types of logging data are extremely valuable. Logs can include a wide variety of data, including system events, transaction data, user activities, web browser logs, errors, and performance metrics. Managing logs efficiently is extremely important for organizations, but dealing with large volumes of data makes it challenging to detect anomalies and unusual patterns or predict potential issues before they become critical. Efficient log management strategies, such as implementing structured logging, using log aggregation tools, and applying machine learning for log analysis, are crucial for handling this data effectively. One of the latest advancements in effectively analyzing a large amount of logging data is Machine Learning (ML) powered analytics provided by Amazon CloudWatch. It is a brand new capability of CloudWatch. This innovative service is transforming the way organizations handle their log data. It offers a faster, more insightful, and automated log data analysis. This article specifically explores utilizing the machine learning-powered analytics of CloudWatch to overcome the challenges of effectively identifying hidden issues within the log data. Before deep diving into some of these features, let's have a quick refresher about Amazon CloudWatch. What Is Amazon CloudWatch? It is an AWS-native monitoring and observability service that offers a whole suite of capabilities: Monitoring: Tracks performance and operational health. Data collection: Gathers logs, metrics, and events, providing a comprehensive view of AWS resources. Unified operational view: Provides insights into applications running on AWS and on-premises servers. Challenges With Logs Data Analysis Volume of Data There's too much log data. In this modern era, applications emit a tremendous amount of log events. Log data can grow so rapidly that developers often find it difficult to identify issues within it; it is like finding a needle in a haystack. Change Identification Another common problem we have often seen is the fundamental problem of log analysis that goes back as long as logs have been around, identifying what has changed in your logs. Proactive Detection Proactive detection is another common challenge. It's great if you can utilize logs to dive in when an application's having an issue, find the root cause of that application issue, and fix it. But how do you know when those issues are occurring? How do you proactively detect them? Of course, you can implement metrics, alarms, etc., for the issues you know about. But there's always the problem of unknowns. So, we're often instrumenting observability and monitoring for past issues. Now, let's dive deep into the machine learning capabilities from CloudWatch that will help you overcome the challenges we have just discussed. Machine Learning Capabilities From CloudWatch Pattern Analysis Imagine you are troubleshooting a real-time distributed application accessed by millions of customers globally and generating a significant amount of application logs. Analyzing tens of thousands of log events manually is challenging, and it can take forever to find the root cause. That is where the new AWS CloudWatch machine learning-based capability can quickly help by grouping log events into patterns within the Logs Insight page of CloudWatch. It is much easier to identify through a limited number of patterns and quickly filter the ones that might be interesting or relevant based on the issue you are trying to troubleshoot. It also allows you to expand the specific pattern to look for the relevant events along with related patterns that might be pertinent. In simple words, Pattern Analysis is the automated grouping and categorization of your log events. Comparison Analysis How can we elevate pattern analysis to the next level? Now that we've seen how pattern analysis works let's see how we can extend this feature to perform comparison analysis. "Comparison Analysis" aims to solve the second challenge of identifying the log changes. Comparison analysis lets you effectively profile your logs using patterns from one time period and then compare them to the patterns extracted for another period and analyze the differences. This will help us answer this fundamental question of what changed to my logs. You can quickly compare your logs while your application's having an issue to a known healthy period. Any changes between two time periods are a strong indicator of the possible root cause of your problem. CloudWatch Logs Anomaly Detection Anomaly detection, in simple terms, is the process of identifying unusual patterns or behaviors in the logs that do not conform to expected norms. To use this feature, we need to first select the LogGroup for the application and enable CloudWatch Logs anomaly detection for it. At that point, CloudWatch will train a machine-learning model on the expected patterns and the volume of each pattern associated with your application. CloudWatch will take five minutes to train the model using logs from your application, and the feature will become active and automatically start servicing these anomalies any time they occur. So things like a brand new error message occurring that wasn't there before, a sudden spike in the volume, or if there's a spike in HTTP 400s are some examples that will result in an anomaly being generated for that. Generate Logs Insight Queries Using Generative AI With this capability, you can give natural language commands to filter log events, and CloudWatch can generate queries using Generative AI. If you are unfamiliar with CloudWatch query language or are from a non-technical background, you can easily use this feature to generate queries and filter logs. It's an iterative process; you need to learn precisely what you want from the first query. So you can update and iterate the query based on the results you see. Let's look at a couple of examples: Natural Language Prompt: "Check API Response Times" Auto-generated query by CloudWatch: In this query: fields @timestamp, @message selects the timestamp and message fields from your logs. | parse @message "Response Time: *" as responseTime parses the @message field to extract the value following the text "Response Time: " and labels it as responseTime. | stats avg(responseTime) calculates the average of the extracted responseTime values. Natural Language Prompt: "Please provide the duration of the ten invocations with the highest latency." Auto-generated query by CloudWatch In this query: fields @timestamp, @message, latency selects the @timestamp, @message, and latency fields from the logs. | stats max(latency) as maxLatency by @message computes the maximum latency value for each unique message. | sort maxLatency desc sorts the results in descending order based on the maximum latency, showing the highest values at the top. | limit 10 restricts the output to the top 10 results with the highest latency values. We can execute these queries in the CloudWatch “Logs Insights” query box to filter the log events from the application logs. These queries extract specific information from the logs, such as identifying errors, monitoring performance metrics, or tracking user activities. The query syntax might vary based on the particular log format and the information you seek. Conclusion CloudWatch's machine learning features offer a robust solution for managing the complexities of log data. These tools make log analysis more efficient and insightful, from automating pattern analysis to enabling anomaly detection. The addition of generative AI for query generation further democratizes access to these powerful insights.
Google BigQuery is a powerful cloud-based data warehousing solution that enables users to analyze massive datasets quickly and efficiently. In Python, BigQuery DataFrames provide a Pythonic interface for interacting with BigQuery, allowing developers to leverage familiar tools and syntax for data querying and manipulation. In this comprehensive developer guide, we'll explore the usage of BigQuery DataFrames, their advantages, disadvantages, and potential performance issues. Introduction To BigQuery DataFrames BigQuery DataFrames serve as a bridge between Google BigQuery and Python, allowing seamless integration of BigQuery datasets into Python workflows. With BigQuery DataFrames, developers can use familiar libraries like Pandas to query, analyze, and manipulate BigQuery data. This Pythonic approach simplifies the development process and enhances productivity for data-driven applications. Advantages of BigQuery DataFrames Pythonic Interface: BigQuery DataFrames provide a Pythonic interface for interacting with BigQuery, enabling developers to use familiar Python syntax and libraries. Integration With Pandas: Being compatible with Pandas, BigQuery DataFrames allow developers to leverage the rich functionality of Pandas for data manipulation. Seamless Query Execution: BigQuery DataFrames handle the execution of SQL queries behind the scenes, abstracting away the complexities of query execution. Scalability: Leveraging the power of Google Cloud Platform, BigQuery DataFrames offer scalability to handle large datasets efficiently. Disadvantages of BigQuery DataFrames Limited Functionality: BigQuery DataFrames may lack certain advanced features and functionalities available in native BigQuery SQL. Data Transfer Costs: Transferring data between BigQuery and Python environments may incur data transfer costs, especially for large datasets. API Limitations: While BigQuery DataFrames provide a convenient interface, they may have limitations compared to directly using the BigQuery API for complex operations. Prerequisites Google Cloud Platform (GCP) Account: Ensure an active GCP account with BigQuery access. Python Environment: Set up a Python environment with the required libraries (pandas, pandas_gbq, and google-cloud-bigquery). Project Configuration: Configure your GCP project and authenticate your Python environment with the necessary credentials. Using BigQuery DataFrames Install Required Libraries Install the necessary libraries using pip: Python pip install pandas pandas-gbq google-cloud-bigquery Authenticate GCP Credentials Authenticate your GCP credentials to enable interaction with BigQuery: Python from google.auth import load_credentials # Load GCP credentials credentials, _ = load_credentials() Querying BigQuery DataFrames Use pandas_gbq to execute SQL queries and retrieve results as a DataFrame: Python import pandas_gbq # SQL Query query = "SELECT * FROM `your_project_id.your_dataset_id.your_table_id`" # Execute Query and Retrieve DataFrame df = pandas_gbq.read_gbq(query, project_id="your_project_id", credentials=credentials) Writing to BigQuery Write a DataFrame to a BigQuery table using pandas_gbq: Python # Write DataFrame to BigQuery pandas_gbq.to_gbq(df, destination_table="your_project_id.your_dataset_id.your_new_table", project_id="your_project_id", if_exists="replace", credentials=credentials) Advanced Features SQL Parameters Pass parameters to your SQL queries dynamically: Python params = {"param_name": "param_value"} query = "SELECT * FROM `your_project_id.your_dataset_id.your_table_id` WHERE column_name = @param_name" df = pandas_gbq.read_gbq(query, project_id="your_project_id", credentials=credentials, dialect="standard", parameters=params) Schema Customization Customize the DataFrame schema during the write operation: Python schema = [{"name": "column_name", "type": "INTEGER"}, {"name": "another_column", "type": "STRING"}] pandas_gbq.to_gbq(df, destination_table="your_project_id.your_dataset_id.your_custom_table", project_id="your_project_id", if_exists="replace", credentials=credentials, table_schema=schema) Performance Considerations Data Volume: Performance may degrade with large datasets, especially when processing and transferring data between BigQuery and Python environments. Query Complexity: Complex SQL queries may lead to longer execution times, impacting overall performance. Network Latency: Network latency between the Python environment and BigQuery servers can affect query execution time, especially for remote connections. Best Practices for Performance Optimization Use Query Filters: Apply filters to SQL queries to reduce the amount of data transferred between BigQuery and Python. Optimize SQL Queries: Write efficient SQL queries to minimize query execution time and reduce resource consumption. Cache Query Results: Cache query results in BigQuery to avoid re-executing queries for repeated requests. Conclusion BigQuery DataFrames offer a convenient and Pythonic way to interact with Google BigQuery, providing developers with flexibility and ease of use. While they offer several advantages, developers should be aware of potential limitations and performance considerations. By following best practices and optimizing query execution, developers can harness the full potential of BigQuery DataFrames for data analysis and manipulation in Python.
This article describes how to implement a Raft Server consensus module in C++20 without using any additional libraries. The narrative is divided into three main sections: A comprehensive overview of the Raft algorithm A detailed account of the Raft Server's development A description of a custom coroutine-based network library The implementation makes use of the robust capabilities of C++20, particularly coroutines, to present an effective and modern methodology for building a critical component of distributed systems. This exposition not only demonstrates the practical application and benefits of C++20 coroutines in sophisticated programming environments, but it also provides an in-depth exploration of the challenges and resolutions encountered while building a consensus module from the ground up, such as Raft Server. The Raft Server and network library repositories, miniraft-cpp and coroio, are available for further exploration and practical applications. Introduction Before delving into the complexities of the Raft algorithm, let’s consider a real-world example. Our goal is to develop a network key-value storage (K/V) system. In C++, this can be easily accomplished by using an unordered_map<string, string>. However, in real-world applications, the requirement for a fault-tolerant storage system increases complexity. A seemingly simple approach could entail deploying three (or more) machines, each hosting a replica of this service. The expectation may be for users to manage data replication and consistency. However, this method can result in unpredictable behaviors. For example, it is possible to update data using a specific key and then retrieve an older version later. What users truly want is a distributed system, potentially spread across multiple machines, that runs as smoothly as a single-host system. To meet this requirement, a consensus module is typically placed in front of the K/V storage (or any similar service, hereafter referred to as the "state machine"). This configuration ensures that all user interactions with the state machine are routed exclusively through the consensus module, rather than direct access. With this context in mind, let us now look at how to implement such a consensus module, using the Raft algorithm as an example. Raft Overview In the Raft algorithm, there are an odd number of participants known as peers. Each peer keeps its own log of records. There is one peer leader, and the others are followers. Users direct all requests (reads and writes) to the leader. When a write request to change the state machine is received, the leader logs it first before forwarding it to the followers, who also log it. Once the majority of peers have successfully responded, the leader considers this entry to be committed, applies it to the state machine, and notifies the user of its success. The Term is a key concept in Raft, and it can only grow. The Term changes when there are system changes, such as a change in leadership. The log in Raft has a specific structure, with each entry consisting of a Term and a Payload. The term refers to the leader who wrote the initial entry. The Payload represents the changes to be made to the state machine. Raft guarantees that two entries with the same index and term are identical. Raft logs are not append-only and may be truncated. For example, in the scenario below, leader S1 replicated two entries before crashing. S2 took the lead and began replicating entries, and S1's log differed from those of S2 and S3. As a result, the last entry in the S1 log will be removed and replaced with a new one. Raft RPC API Let us examine the Raft RPC. It's worth noting that the Raft API is quite simple, with just two calls. We'll begin by looking at the leader election API. It is important to note that Raft ensures that there can only be one leader per term. There may also be terms without a leader, such as if elections fail. To ensure that only one election occurs, a peer saves its vote in a persistent variable called VotedFor. The election RPC is called RequestVote and has three parameters: Term, LastLogIndex, and LastLogTerm. The response contains Term and VoteGranted. Notably, every request contains Term, and in Raft, peers can only communicate effectively if their Terms are compatible. When a peer initiates an election, it sends a RequestVote request to the other peers and collects their votes. If the majority of the responses are positive, the peer advances to the leader role. Now let's look at the AppendEntries request. It accepts parameters such as Term, PrevLogIndex, PrevLogTerm, and Entries, and the response contains Term and Success. If the Entries field in the request is empty, it acts as a Heartbeat. When an AppendEntries request is received, a follower checks the PrevLogIndex for the Term. If it matches PrevLogTerm, the follower adds Entries to its log beginning with PrevLogIndex + 1 (entries after PrevLogIndex are removed if they exist): If the terms do not match, the follower returns Success=false. In this case, the leader retries sending the request, lowering the PrevLogIndex by one. When a peer receives a RequestVote request, it compares its LastTerm and LastLogIndex pairs to the most recent log entry. If the pair is less than or equal to the requestor's, the peer returns VoteGranted=true. State Transitions in Raft Raft's state transitions look like this. Each peer begins in the Follower state. If a Follower does not receive AppendEntries within a set timeout, it extends its Term and moves to the Candidate state, triggering an election. A peer can move from the Candidate state to the Leader state if it wins the election, or return to the Follower state if it receives an AppendEntries request. A Candidate can also revert to being a Candidate if it does not transition to either a Follower or a Leader within the timeout period. If a peer in any state receives an RPC request with a Term greater than its current one, it moves to the Follower state. Commit Let us now consider an example that demonstrates how Raft is not as simple as it may appear. I took this example from Diego Ongaro's dissertation. S1 was the leader in Term 2, where it replicated two entries before crashing. Following this, S5 took the lead in Term 3, added an entry, and then crashed. Next, S2 took over leadership in Term 4, replicated the entry from Term 2, added its own entry for Term 4, and then crashed. This results in two possible outcomes: S5 reclaims leadership and truncates the entries from Term 2, or S1 regains leadership and commits the entries from Term 2. The entries from Term 2 are securely committed only after they are covered by a subsequent entry from a new leader. This example demonstrates how the Raft algorithm operates in a dynamic and often unpredictable set of circumstances. The sequence of events, which includes multiple leaders and crashes, demonstrates the complexity of maintaining a consistent state across a distributed system. This complexity is not immediately apparent, but it becomes important in situations involving leader changes and system failures. The example emphasizes the importance of a robust and well-thought-out approach to dealing with such complexities, which is precisely what Raft seeks to address. Additional Materials For further study and a deeper understanding of Raft, I recommend the following materials: the original Raft paper, which is ideal for implementation. Diego Ongaro's PhD dissertation provides more in-depth insights. Maxim Babenko's lecture goes into even greater detail. Raft Implementation Let's now move on to the Raft server implementation, which, in my opinion, benefits greatly from C++20 coroutines. In my implementation, the Persistent State is stored in memory. However, in real-world scenarios, it should be saved to disk. I'll talk more about the MessageHolder later. It functions similarly to a shared_ptr, but is specifically designed to handle Raft messages, ensuring efficient management and processing of these communications. C++ struct TState { uint64_t CurrentTerm = 1; uint32_t VotedFor = 0; std::vector<TMessageHolder<TLogEntry>> Log; }; In the Volatile State, I labeled entries with either L for "leader" or F for "follower" to clarify their use. The CommitIndex denotes the last log entry that was committed. In contrast, LastApplied is the most recent log entry applied to the state machine, and it is always less than or equal to the CommitIndex. The NextIndex is important because it identifies the next log entry to be sent to a peer. Similarly, MatchIndex keeps track of the last log entry that discovered a match. The Votes section contains the IDs of peers who voted for me. Timeouts are an important aspect to manage: HeartbeatDue and RpcDue manage leader timeouts, while ElectionDue handles follower timeouts. C++ using TTime = std::chrono::time_point<std::chrono::steady_clock>; struct TVolatileState { uint64_t CommitIndex = 0; // L,F uint64_t LastApplied = 0; // L,F std::unordered_map<uint32_t, uint64_t> NextIndex; // L std::unordered_map<uint32_t, uint64_t> MatchIndex; // L std::unordered_set<uint32_t> Votes; // C std::unordered_map<uint32_t, TTime> HeartbeatDue; // L std::unordered_map<uint32_t, TTime> RpcDue; // L TTime ElectionDue; // F }; Raft API My implementation of the Raft algorithm has two classes. The first is INode, which denotes a peer. This class includes two methods: Send, which stores outgoing messages in an internal buffer, and Drain, which handles actual message dispatch. Raft is the second class, and it manages the current peer's state. It also includes two methods: Process, which handles incoming connections, and ProcessTimeout, which must be called on a regular basis to manage timeouts, such as the leader election timeout. Users of these classes should use the Process, ProcessTimeout, and Drain methods as necessary. INode's Send method is invoked internally within the Raft class, ensuring that message handling and state management are seamlessly integrated within the Raft framework. C++ struct INode { virtual ~INode() = default; virtual void Send(TMessageHolder<TMessage> message) = 0; virtual void Drain() = 0; }; class TRaft { public: TRaft(uint32_t node, const std::unordered_map<uint32_t, std::shared_ptr<INode>>& nodes); void Process(TTime now, TMessageHolder<TMessage> message, const std::shared_ptr<INode>& replyTo = {}); void ProcessTimeout(TTime now); }; Raft Messages Now let's look at how I send and read Raft messages. Instead of using a serialization library, I read and send raw structures in TLV format. This is what the message header looks like: C++ struct TMessage { uint32_t Type; uint32_t Len; char Value[0]; }; For additional convenience, I've introduced a second-level header: C++ struct TMessageEx: public TMessage { uint32_t Src = 0; uint32_t Dst = 0; uint64_t Term = 0; }; This includes the sender's and receiver's ID in each message. With the exception of LogEntry, all messages inherit from TMessageEx. LogEntry and AppendEntries are implemented as follows: C++ struct TLogEntry: public TMessage { static constexpr EMessageType MessageType = EMessageType::LOG_ENTRY; uint64_t Term = 1; char Data[0]; }; struct TAppendEntriesRequest: public TMessageEx { static constexpr EMessageType MessageType = EMessageType::APPEND_ENTRIES_REQUEST; uint64_t PrevLogIndex = 0; uint64_t PrevLogTerm = 0; uint32_t Nentries = 0; }; To facilitate message handling, I use a class called MessageHolder, reminiscent of a shared_ptr: C++ template<typename T> requires std::derived_from<T, TMessage> struct TMessageHolder { T* Mes; std::shared_ptr<char[]> RawData; uint32_t PayloadSize; std::shared_ptr<TMessageHolder<TMessage>[]> Payload; template<typename U> requires std::derived_from<U, T> TMessageHolder<U> Cast() {...} template<typename U> requires std::derived_from<U, T> auto Maybe() { ... } }; This class includes a char array containing the message itself. It may also include a Payload (which is only used for AppendEntry), as well as methods for safely casting a base-type message to a specific one (the Maybe method) and unsafe casting (the Cast method). Here is a typical example of using the MessageHolder: C++ void SomeFunction(TMessageHolder<TMessage> message) { auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>(); if (maybeAppendEntries) { auto appendEntries = maybeAppendEntries.Cast(); } // if we are sure auto appendEntries = message.Cast<TAppendEntriesRequest>(); // usage with overloaded operator-> auto term = appendEntries->Term; auto nentries = appendEntries->Nentries; // ... } And a real-life example in the Candidate state handler: C++ void TRaft::Candidate(TTime now, TMessageHolder<TMessage> message) { if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) { OnRequestVote(std::move(maybeResponseVote.Cast())); } else if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) { OnRequestVote(now, std::move(maybeRequestVote.Cast())); } else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) { OnAppendEntries(now, std::move(maybeAppendEntries.Cast())); } } This design approach improves the efficiency and flexibility of message handling in Raft implementations. Raft Server Let's discuss the Raft server implementation. The Raft server will set up coroutines for network interactions. First, we'll look at the coroutines that handle message reading and writing. The primitives used for these coroutines are discussed later in the article, along with an analysis of the network library. The writing coroutine is responsible for writing messages to the socket, whereas the reading coroutine is slightly more complex. To read, it must first retrieve the Type and Len variables, then allocate an array of Len bytes, and finally, read the rest of the message. This structure facilitates the efficient and effective management of network communications within the Raft server. C++ template<typename TSocket> TValueTask<void> TMessageWriter<TSocket>::Write(TMessageHolder<TMessage> message) { co_await TByteWriter(Socket).Write(message.Mes, message->Len); auto payload = std::move(message.Payload); for (uint32_t i = 0; i < message.PayloadSize; ++i) { co_await Write(std::move(payload[i])); } co_return; } template<typename TSocket> TValueTask<TMessageHolder<TMessage>> TMessageReader<TSocket>::Read() { decltype(TMessage::Type) type; decltype(TMessage::Len) len; auto s = co_await Socket.ReadSome(&type, sizeof(type)); if (s != sizeof(type)) { /* throw */ } s = co_await Socket.ReadSome(&len, sizeof(len)); if (s != sizeof(len)) { /* throw */} auto mes = NewHoldedMessage<TMessage>(type, len); co_await TByteReader(Socket).Read(mes->Value, len - sizeof(TMessage)); auto maybeAppendEntries = mes.Maybe<TAppendEntriesRequest>(); if (maybeAppendEntries) { auto appendEntries = maybeAppendEntries.Cast(); auto nentries = appendEntries->Nentries; mes.InitPayload(nentries); for (uint32_t i = 0; i < nentries; i++) mes.Payload[i] = co_await Read(); } co_return mes; } To launch a Raft server, create an instance of the RaftServer class and call the Serve method. The Serve method starts two coroutines. The Idle coroutine is responsible for periodically processing timeouts, whereas InboundServe manages incoming connections. C++ class TRaftServer { public: void Serve() { Idle(); InboundServe(); } private: TVoidTask InboundServe(); TVoidTask InboundConnection(TSocket socket); TVoidTask Idle(); } Incoming connections are received via the accept call. Following this, the InboundConnection coroutine is launched, which reads incoming messages and forwards them to the Raft instance for processing. This configuration ensures that the Raft server can efficiently handle both internal timeouts and external communication. C++ TVoidTask InboundServe() { while (true) { auto client = co_await Socket.Accept(); InboundConnection(std::move(client)); } co_return; } TVoidTask InboundConnection(TSocket socket) { while (true) { auto mes = co_await TMessageReader(client->Sock()).Read(); Raft->Process(std::chrono::steady_clock::now(), std::move(mes), client); Raft->ProcessTimeout(std::chrono::steady_clock::now()); DrainNodes(); } co_return; } The Idle coroutine works as follows: it calls the ProcessTimeout method every sleep second. It's worth noting that this coroutine uses asynchronous sleep. This design enables the Raft server to efficiently manage time-sensitive operations without blocking other processes, improving the server's overall responsiveness and performance. C++ while (true) { Raft->ProcessTimeout(std::chrono::steady_clock::now()); DrainNodes(); auto t1 = std::chrono::steady_clock::now(); if (t1 > t0 + dt) { DebugPrint(); t0 = t1; } co_await Poller.Sleep(t1 + sleep); } The coroutine was created for sending outgoing messages and is designed to be simple. It repeatedly sends all accumulated messages to the socket in a loop. In the event of an error, it starts another coroutine that is responsible for connecting (via the connect function). This structure ensures that outgoing messages are handled smoothly and efficiently while remaining robust through error handling and connection management. C++ try { while (!Messages.empty()) { auto tosend = std::move(Messages); Messages.clear(); for (auto&& m : tosend) { co_await TMessageWriter(Socket).Write(std::move(m)); } } } catch (const std::exception& ex) { Connect(); } co_return; With the Raft Server implemented, these examples show how coroutines greatly simplify development. While I haven't looked into Raft's implementation (trust me, it's much more complex than the Raft Server), the overall algorithm is not only simple but also compact in design. Next, we'll look at some Raft Server examples. Following that, I'll describe the network library I created from scratch specifically for the Raft Server. This library is critical to enabling efficient network communication within the Raft framework. Here's an example of launching a Raft cluster with three nodes. Each instance receives its own ID as an argument, as well as the other instances' addresses and IDs. In this case, the client communicates exclusively with the leader. It sends random strings while keeping a set number of in-flight messages and waiting for their commitment. This configuration depicts the interaction between the client and the leader in a multi-node Raft environment, demonstrating the algorithm's handling of distributed data and consensus. Shell $ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3 ... Candidate, Term: 2, Index: 0, CommitIndex: 0, ... Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0 MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176 .... $ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3 ... $ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3 ... Follower, Term: 3, Index: 1080175, CommitIndex: 1080175, ... $ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1 198k 0:00:03 [159.2k/s] [ <=> I measured the commit latency for configurations of both 3-node and 5-node clusters. As expected, the latency is higher for the 5-node setup: 3 Nodes 50 percentile (median): 292,872 ns 80 percentile: 407,561 ns 90 percentile: 569,164 ns 99 percentile: 40,279,001 ns 5 Nodes 50 percentile (median): 425,194 ns 80 percentile: 672,541 ns 90 percentile: 1,027,669 ns 99 percentile: 38,578,749 ns I/O Library Let's now look at the I/O library that I created from scratch and used in the Raft server's implementation. I began with the example below, taken from cppreference.com, which is an implementation of an echo server: C++ task<> tcp_echo_server() { char data[1024]; while (true) { std::size_t n = co_await socket.async_read_some(buffer(data)); co_await async_write(socket, buffer(data, n)); } } An event loop, a socket primitive, and methods like read_some/write_some (named ReadSome/WriteSome in my library) were required for my library, as well as higher-level wrappers such as async_write/async_read (named TByteReader/TByteWriter in my library). To implement the ReadSome method of the socket, I had to create an Awaitable as follows: C++ auto ReadSome(char* buf, size_t size) { struct TAwaitable { bool await_ready() { return false; /* always suspend */ } void await_suspend(std::coroutine_handle<> h) { poller->AddRead(fd, h); } int await_resume() { return read(fd, b, s); } TSelect* poller; int fd; char* b; size_t s; }; return TAwaitable{Poller_,Fd_,buf,size}; } When co_await is called, the coroutine suspends because await_ready returns false. In await_suspend, we capture the coroutine_handle and pass it along with the socket handle to the poller. When the socket is ready, the poller calls the coroutine_handle to restart the coroutine. Upon resumption, await_resume is called, which performs a read and returns the number of bytes read to the coroutine. The WriteSome, Accept, and Connect methods are implemented in a similar manner. The Poller is set up as follows: C++ struct TEvent { int Fd; int Type; // READ = 1, WRITE = 2; std::coroutine_handle<> Handle; }; class TSelect { void Poll() { for (const auto& ch : Events) { /* FD_SET(ReadFds); FD_SET(WriteFds);*/ } pselect(Size, ReadFds, WriteFds, nullptr, ts, nullptr); for (int k = 0; k < Size; ++k) { if (FD_ISSET(k, WriteFds)) { Events[k].Handle.resume(); } // ... } } std::vector<TEvent> Events; // ... }; I keep an array of pairs (socket descriptor, coroutine handle) that are used to initialize structures for the poller backend (in this case, select). Resume is called when the coroutines corresponding to ready sockets wake up. This is applied in the main function as follows: C++ TSimpleTask task(TSelect& poller) { TSocket socket(0, poller); char buffer[1024]; while (true) { auto readSize = co_await socket.ReadSome(buffer, sizeof(buffer)); } } int main() { TSelect poller; task(poller); while (true) { poller.Poll(); } } We start a coroutine (or coroutines) that enters sleep mode on co_await, and control is then passed to an infinite loop that invokes the poller mechanism. If a socket becomes ready within the poller, the corresponding coroutine is triggered and executed until the next co_await. To read and write Raft messages, I needed to create high-level wrappers over ReadSome/WriteSome, similar to: C++ TValueTask<T> Read() { T res; size_t size = sizeof(T); char* p = reinterpret_cast<char*>(&res); while (size != 0) { auto readSize = co_await Socket.ReadSome(p, size); p += readSize; size -= readSize; } co_return res; } // usage T t = co_await Read<T>(); To implement these, I needed to create a coroutine that also functions as an Awaitable. The coroutine is made up of a pair: coroutine_handle and promise. The coroutine_handle is used to manage the coroutine from the outside, whereas the promise is for internal management. The coroutine_handle can include Awaitable methods, which allow the coroutine's result to be awaited with co_await. The promise can be used to store the result returned by co_return and to awaken the calling coroutine. In coroutine_handle, within the await_suspend method, we store the coroutine_handle of the calling coroutine. Its value will be saved in the promise: C++ template<typename T> struct TValueTask : std::coroutine_handle<> { bool await_ready() { return !!this->promise().Value; } void await_suspend(std::coroutine_handle<> caller) { this->promise().Caller = caller; } T await_resume() { return *this->promise().Value; } using promise_type = TValuePromise<T>; }; Within the promise itself, the return_value method will store the returned value. The calling coroutine is woken up with an awaitable, which is returned in final_suspend. This is because the compiler, after co_return, invokes co_await on final_suspend. C++ template<typename T> struct TValuePromise { void return_value(const T& t) { Value = t; } std::suspend_never initial_suspend() { return {}; } // resume Caller here TFinalSuspendContinuation<T> final_suspend() noexcept; std::optional<T> Value; std::coroutine_handle<> Caller = std::noop_coroutine(); }; In await_suspend, the calling coroutine can be returned, and it will be automatically awakened. It is important to note that the called coroutine will now be in a sleeping state, and its coroutine_handle must be destroyed with destroy to avoid a memory leak. This can be accomplished, for example, in the destructor of TValueTask. C++ template<typename T> struct TFinalSuspendContinuation { bool await_ready() noexcept { return false; } std::coroutine_handle<> await_suspend( std::coroutine_handle<TValuePromise<T>> h) noexcept { return h.promise().Caller; } void await_resume() noexcept { } }; With the library description completed, I ported the libevent benchmark to it to ensure its performance. This benchmark generates a chain of N Unix pipes, each one linked to the next. It then initiates 100 write operations into the chain, which continues until there are 1000 total write calls. The image below depicts the benchmark's runtime as a function of N for various backends of my library (coroio) versus libevent. This test demonstrates that my library performs similarly to libevent, confirming its efficiency and effectiveness in managing I/O operations. Conclusion In closing, this article has described the implementation of a Raft server using C++20 coroutines, emphasizing the convenience and efficiency provided by this modern C++ feature. The custom I/O library, which was written from scratch, is critical to this implementation because it effectively handles asynchronous I/O operations. The performance of the library was validated against the libevent benchmark, demonstrating its competency. For those interested in learning more about or using these tools, the I/O library is available at coroio, and the Raft library at miniraft-cpp (linked at the beginning of the article). Both repositories provide a detailed look at how C++20 coroutines can be used to build robust, high-performance distributed systems.
Introduction to Datafaker Datafaker is a modern framework that enables JVM programmers to efficiently generate fake data for their projects using over 200 data providers allowing for quick setup and usage. Custom providers could be written when you need some domain-specific data. In addition to providers, the generated data can be exported to popular formats like CSV, JSON, SQL, XML, and YAML. For a good introduction to the basic features, see "Datafaker: An Alternative to Using Production Data." Datafaker offers many features, such as working with sequences and collections and generating custom objects based on schemas (see "Datafaker 2.0"). Bulk Data Generation In software development and testing, the need to frequently generate data for various purposes arises, whether it's to conduct non-functional tests or to simulate burst loads. Let's consider a straightforward scenario when we have the task of generating 10,000 messages in JSON format to be sent to RabbitMQ. From my perspective, these options are worth considering: Developing your own tool: One option is to write a custom application from scratch to generate these records(messages). If the generated data needs to be more realistic, it makes sense to use Datafaker or JavaFaker. Using specific tools: Alternatively, we could select specific tools designed for particular databases or message brokers. For example, tools like voluble for Kafka provide specialized functionalities for generating and publishing messages to Kafka topics; or a more modern tool like ShadowTraffic, which is currently under development and directed towards a container-based approach, which may not always be necessary. Datafaker Gen: Finally, we have the option to use Datafaker Gen, which I want to consider in the current article. Datafaker Gen Overview Datafaker Gen offers a command-line generator based on the Datafaker library which allows for the continuous generation of data in various formats and integration with different storage systems, message brokers, and backend services. Since this tool uses Datafaker, there may be a possibility that the data is realistic. Configuration of the scheme, format type, and sink can be done without rebuilding the project. Datafake Gen consists of the following main components that can be configured: 1. Schema Definition Users can define the schema for their records in the config.yaml file. The schema specifies the field definitions of the record based on the Datafaker provider. It also allows for the definition of embedded fields. YAML default_locale: en-EN fields: - name: lastname generators: [ Name#lastName ] - name: firstname generators: [ Name#firstName ] 2. Format Datafake Gen allows users to specify the format in which records will be generated. Currently, there are basic implementations for CSV, JSON, SQL, XML, and YAML formats. Additionally, formats can be extended with custom implementations. The configuration for formats is specified in the output.yaml file. YAML formats: csv: quote: "@" separator: $$$$$$$ json: formattedAs: "[]" yaml: xml: pretty: true 3. Sink The sink component determines where the generated data will be stored or published. The basic implementation includes command-line output and text file sinks. Additionally, sinks can be extended with custom implementations such as RabbitMQ, as demonstrated in the current article. The configuration for sinks is specified in the output.yaml file. YAML sinks: rabbitmq: batchsize: 1 # when 1 message contains 1 document, when >1 message contains a batch of documents host: localhost port: 5672 username: guest password: guest exchange: test.direct.exchange routingkey: products.key Extensibility via Java SPI Datafake Gen uses the Java SPI (Service Provider Interface) to make it easy to add new formats or sinks. This extensibility allows for customization of Datafake Gen according to specific requirements. How To Add a New Sink in Datafake Gen Before adding a new sink, you may want to check if it already exists in the datafaker-gen-examples repository. If it does not exist, you can refer to examples on how to add a new sink. When it comes to extending Datafake Gen with new sink implementations, developers have two primary options to consider: By using this parent project, developers can implement sink interfaces for their sink extensions, similar to those available in the datafaker-gen-examples repository. Include dependencies from the Maven repository to access the required interfaces. For this approach, Datafake Gen should be built and exist in the local Maven repository. This approach provides flexibility in project structure and requirements. 1. Implementing RabbitMQ Sink To add a new RabbitMQ sink, one simply needs to implement the net.datafaker.datafaker_gen.sink.Sink interface. This interface contains two methods: getName - This method defines the sink name. run - This method triggers the generation of records and then sends or saves all the generated records to the specified destination. The method parameters include the configuration specific to this sink retrieved from the output.yaml file as well as the data generation function and the desired number of lines to be generated. Java import net.datafaker.datafaker_gen.sink.Sink; public class RabbitMqSink implements Sink { @Override public String getName() { return "rabbitmq"; } @Override public void run(Map<String, ?> config, Function<Integer, ?> function, int numberOfLines) { // Read output configuration ... int numberOfLinesToPrint = numberOfLines; String host = (String) config.get("host"); // Generate lines String lines = (String) function.apply(numberOfLinesToPrint); // Sending or saving results to the expected resource // In this case, this is connecting to RebbitMQ and sending messages. ConnectionFactory factory = getConnectionFactory(host, port, username, password); try (Connection connection = factory.newConnection()) { Channel channel = connection.createChannel(); JsonArray jsonArray = JsonParser.parseString(lines).getAsJsonArray(); jsonArray.forEach(jsonElement -> { try { channel.basicPublish(exchange, routingKey, null, jsonElement.toString().getBytes()); } catch (Exception e) { throw new RuntimeException(e); } }); } catch (Exception e) { throw new RuntimeException(e); } } } 2. Adding Configuration for the New RabbitMQ Sink As previously mentioned, the configuration for sinks or formats can be added to the output.yaml file. The specific fields may vary depending on your custom sink. Below is an example configuration for a RabbitMQ sink: YAML sinks: rabbitmq: batchsize: 1 # when 1 message contains 1 document, when >1 message contains a batch of documents host: localhost port: 5672 username: guest password: guest exchange: test.direct.exchange routingkey: products.key 3. Adding Custom Sink via SPI Adding a custom sink via SPI (Service Provider Interface) involves including the provider configuration in the ./resources/META-INF/services/net.datafaker.datafaker_gen.sink.Sink file. This file contains paths to the sink implementations: Properties files net.datafaker.datafaker_gen.sink.RabbitMqSink These are all 3 simple steps on how to expand Datafake Gen. In this example, we are not providing a complete implementation of the sink, as well as how to use additional libraries. To see the complete implementations, you can refer to the datafaker-gen-rabbitmq module in the example repository. How To Run Step 1 Build a JAR file based on the new implementation: Shell ./mvnw clean verify Step 2 Define the schema for records in the config.yaml file and place this file in the appropriate location where the generator should run. Additionally, define the sinks and formats in the output.yaml file, as demonstrated previously. Step 3 Datafake Gen can be executed through two options: Use bash script from the bin folder in the parent project: Shell # Format json, number of lines 10000 and new RabbitMq Sink bin/datafaker_gen -f json -n 10000 -sink rabbitmq 2. Execute the JAR directly, like this: Shell java -cp [path_to_jar] net.datafaker.datafaker_gen.DatafakerGen -f json -n 10000 -sink rabbitmq How Fast Is It? The test was done based on the scheme described above, which means that one document consists of two fields. Documents are recorded one by one in the RabbitMQ queue in JSON format. The table below shows the speed for 10,000, 100,000, and 1M records on my local machine: Records Time 10000 401 ms 100000 11613ms 1000000 121601ms Conclusion The Datafake Gen tool enables the creation of flexible and fast data generators for various types of destinations. Built on Datafaker, it facilitates realistic data generation. Developers can easily configure the content of records, formats, and sinks to suit their needs. As a simple Java application, it can be deployed anywhere you want, whether it's in Docker or on-premise machines. The full source code is available here. I would like to thank Sergey Nuyanzin for reviewing this article. Thank you for reading, and I am glad to be of help.
Function pipelines allow seamless execution of multiple functions in a sequential manner, where the output of one function serves as the input to the next. This approach helps in breaking down complex tasks into smaller, more manageable steps, making code more modular, readable, and maintainable. Function pipelines are commonly used in functional programming paradigms to transform data through a series of operations. They promote a clean and functional style of coding, emphasizing the composition of functions to achieve desired outcomes. In this article, we will explore the fundamentals of function pipelines in Python, including how to create and use them effectively. We'll discuss techniques for defining pipelines, composing functions, and applying pipelines to real-world scenarios. Creating Function Pipelines in Python In this segment, we'll explore two instances of function pipelines. In the initial example, we'll define three functions—'add', 'multiply', and 'subtract'—each designed to execute a fundamental arithmetic operation as implied by its name. Python def add(x, y): return x + y def multiply(x, y): return x * y def subtract(x, y): return x - y Next, create a pipeline function that takes any number of functions as arguments and returns a new function. This new function applies each function in the pipeline to the input data sequentially. Python # Pipeline takes multiple functions as argument and returns an inner function def pipeline(*funcs): def inner(data): result = data # Iterate thru every function for func in funcs: result = func(result) return result return inner Let’s understand the pipeline function. The pipeline function takes any number of functions (*funcs) as arguments and returns a new function (inner). The inner function accepts a single argument (data) representing the input data to be processed by the function pipeline. Inside the inner function, a loop iterates over each function in the funcs list. For each function func in the funcs list, the inner function applies func to the result variable, which initially holds the input data. The result of each function call becomes the new value of result. After all functions in the pipeline have been applied to the input data, the inner function returns the final result. Next, we create a function called ‘calculation_pipeline’ that passes the ‘add’, ‘multiply’ and ‘substract’ to the pipeline function. Python # Create function pipeline calculation_pipeline = pipeline( lambda x: add(x, 5), lambda x: multiply(x, 2), lambda x: subtract(x, 10) ) Then we can test the function pipeline by passing an input value through the pipeline. Python result = calculation_pipeline(10) print(result) # Output: 20 We can visualize the concept of a function pipeline through a simple diagram. Another example: Python def validate(text): if text is None or not text.strip(): print("String is null or empty") else: return text def remove_special_chars(text): for char in "!@#$%^&*()_+{}[]|\":;'<>?,./": text = text.replace(char, "") return text def capitalize_string(text): return text.upper() # Pipeline takes multiple functions as argument and returns an inner function def pipeline(*funcs): def inner(data): result = data # Iterate thru every function for func in funcs: result = func(result) return result return inner # Create function pipeline str_pipeline = pipeline( lambda x : validate(x), lambda x: remove_special_chars(x), lambda x: capitalize_string(x) ) Testing the pipeline by passing the correct input: Python # Test the function pipeline result = str_pipeline("Test@!!!%#Abcd") print(result) # TESTABCD In case of an empty or null string: Python result = str_pipeline("") print(result) # Error In the example, we've established a pipeline that begins by validating the input to ensure it's not empty. If the input passes this validation, it proceeds to the 'remove_special_chars' function, followed by the 'Capitalize' function. Benefits of Creating Function Pipelines Function pipelines encourage modular code design by breaking down complex tasks into smaller, composable functions. Each function in the pipeline focuses on a specific operation, making it easier to understand and modify the code. By chaining together functions in a sequential manner, function pipelines promote clean and readable code, making it easier for other developers to understand the logic and intent behind the data processing workflow. Function pipelines are flexible and adaptable, allowing developers to easily modify or extend existing pipelines to accommodate changing requirements.
It is often said that software developers should create simple solutions to the problems that they are presented with. However, coming up with a simple solution is not always easy, as it requires time, experience, and a good approach. And to make matters worse, a simple solution in many ways will not impress your co-workers or give your resume a boost. Ironically, the quest for simplicity in software development is often a complex journey. A developer must navigate through a labyrinth of technical constraints, user requirements, and evolving technological landscapes. The catch-22 is palpable: while a simple solution is desirable, it is not easily attained nor universally appreciated. In the competitiveness of software development, where complexity often disguises itself as sophistication, simple solutions may not always resonate with the awe and admiration they deserve. They may go unnoticed in a culture that frequently equates complexity with competence. Furthermore, the pursuit of simplicity can sometimes be a thankless endeavor. In an environment where complex designs and elaborate architectures are often celebrated, a minimalist approach might not captivate colleagues or stand out in a portfolio. This dichotomy presents a unique challenge for software developers who want to balance the art of simplicity with the practicalities of career advancement and peer recognition. As we get closer to the point of this discussion, I will share my personal experiences in grappling with the "curse of simplicity." These experiences shed light on the nuanced realities of being a software developer committed to simplicity in a world that often rewards complexity. The Story Several years ago, I was part of a Brazilian startup confronted with a daunting issue. The accounting report crucial for tax payment to São Paulo's city administration had been rendered dysfunctional due to numerous changes in its data sources. These modifications stemmed from shifts in payment structures with the company's partners. The situation escalated when the sole analyst responsible for manually generating the report went on vacation, leaving the organization vulnerable to substantial fines from the city hall. To solve the problem, the company’s CFO called a small committee to forward a solution. In advocating for a resolution, I argued against revisiting the complex, defunct legacy solution and proposed a simpler approach. I was convinced that we needed "one big table" with all the columns necessary for the report and that each row should have the granularity of a transaction. This way, the report could be generated by simply flattening the data in a simple query. Loading the data into this table should be done by a simple, secure, and replicable process. My team concurred with my initial proposal and embarked on its implementation, following two fundamental principles: The solution had to be altruistic and crafted for others to utilize and maintain. It had to be code-centric, with automated deployment and code reviews through Pull Requests (PR). We selected Python as our programming language due to its familiarity with the data analysis team and its reputation for being easy to master. In our tool exploration, we came across Airflow, which had been gaining popularity even before its version 1.0 release. Airflow employs DAGs (Direct Acyclic Graphs) to construct workflows, where each step is executed via what is termed "operators." Our team developed two straightforward operators: one for transferring data between tables in different databases, and another for schema migration. This approach allowed for local testing of DAG changes, with the deployment process encompassing Pull Requests followed by a CI/CD pipeline that deployed changes to production. The schema migration operator bore a close resemblance to the implementation in Ruby on Rails migration. We hosted Airflow on AWS Elastic Beanstalk, and Jenkins was employed for the deployment pipeline. During this period, Metabase was already operational for querying databases. Within a span of two to three weeks, our solution was up and running. The so-called "one big table," effectively provided the accounting report. It was user-friendly and, most crucially, comprehensible to everyone involved. The data analysis team, thrilled by the architecture, began adopting this infrastructure for all their reporting needs. A year down the line, the landscape had transformed significantly, with dozens of DAGs in place, hundreds of reporting tables created, and thousands of schema migration files in existence. Synopsis of the Solution In essence, our simple solution might not have seemed fancy, but it was super effective. It allowed the data analysis team to generate reports more quickly and easily, and it saved the company money on fines. The concept of the "curse of simplicity" in software development is a paradoxical phenomenon. It suggests that solutions that appear simple on the surface are often undervalued, especially when compared to their more complex counterparts, which I like to refer to as "complex megazords." This journey of developing a straightforward yet effective solution was an eye-opener for me, and it altered my perspective on the nature of simplicity in problem-solving. There's a common misconception that simple equates to easy. However, the reality is quite the contrary. In reality, as demonstrated by the example I have provided, crafting a solution that is both simple and effective requires a deep understanding of the problem, a sophisticated level of knowledge, and a wealth of experience. It's about distilling complex ideas and processes into their most essential form without losing their effectiveness. What I've come to realize is that simple solutions, though they may seem less impressive at first glance, are often superior. Their simplicity makes them more accessible and easier to understand, maintain, and use. This accessibility is crucial in a world where technology is rapidly evolving and there is a need for user-friendly, maintainable solutions.
SIEM solutions didn't work perfectly well when they were first introduced in the early 2000s, partly because of their architecture and functionality at the time but also due to the faults in the data and data sources that were fed into them. During this period, data inputs were often rudimentary, lacked scalability, and necessitated extensive manual intervention across operational phases. Three of those data sources stood out. 1. Hand-Coded Application Layer Security Coincidentally, application layer security became a thing when SIEM solutions were first introduced. Around that time, it became obvious that defending the perimeter, hosts, and endpoints was not sufficient security for applications. Some developers experimented with manually coding application security layers to bolster protection against functionality-specific attacks. While this approach provided an additional security layer, it failed to provide SIEM solutions with accurate data due to developers' focus on handling use cases rather than abuse cases. This was because the developers were accustomed to writing code to handle use cases, not abuse cases. So, they weren’t experienced and didn’t have the experience or knowledge to anticipate all likely attacks and write complex codes to collect or authorize access to data related to those attacks. Moreover, many sophisticated attacks necessitated correlating events across multiple applications and data sources, which was beyond the monitoring of individual applications and their coding capabilities. 2. SPAN and TAP Ports SPAN ports, also known as mirror ports or monitor ports, were configured on network switches or routers to copy and forward traffic from one or more source ports to a designated monitoring port. They operated within the network infrastructure and allowed admins to monitor network traffic without disrupting the flow of data to the intended destination. On the other hand, TAP ports were hardware devices that passively captured and transmitted network traffic from one network segment to another. TAP operated independently of network switches and routers but still provided complete visibility into network traffic regardless of network topology or configuration. Despite offering complete visibility into network traffic, these ports fell out of favor in SIEM integration due to their deficiency in contextual information. The raw packet data that SPAN and TAP ports collected lacked the necessary context for effective threat detection and analysis, alongside challenges such as limited network visibility, complex configuration, and inadequate capture of encrypted traffic. 3. The 2000s REST API As a successor to SOAP API, REST API revolutionized data exchange with its simplicity, speed, efficiency, and statelessness. Aligned with the rise of cloud solutions, REST API served as an ideal conduit between SIEM and cloud environments, offering standardized access to diverse data sources. However, it had downsides: one of which was its network efficiency issues. REST APIs sometimes over-fetched or under-fetched data, which resulted in inefficient data transfer between the API and the SIEM solution. There were also the issues of evolving schemas in REST APIs. Without a strongly typed schema, SIEM solutions found it difficult to accurately map incoming data fields to the predefined schema, leading to parsing errors or data mismatches. Then there was the issue of its complexity and learning curve. REST API implementation is known to be complex, especially in managing authentication, pagination, rate limiting, and error handling. Because of this complexity, security analysts and admins responsible for configuring SIEM data sources found it difficult or even required additional training to handle its integrations effectively. This also led to configuration errors, which then affected data collection and analysis. While some of the above data sources have not been completely scrapped out of use, their technologies have been greatly improved, and they now have seamless integrations. Most Recently Used SIEM Data Sources 1. Cloud Logs The cloud was introduced in 2006 when Amazon launched AWS EC2, followed shortly by Salesforce's service cloud solution in 2009. It offers unparalleled scalability, empowering organizations to manage vast volumes of log data effortlessly. Additionally, it provides centralized logging and monitoring capabilities, streamlining data collection and analysis for SIEM solutions. With built-in security features and compliance controls, cloud logs enable SIEM solutions to swiftly detect and respond to security threats. However, challenges accompany these advantages. According to Adam Praksch, a SIEM administrator at IBM, SIEM solutions often struggle to keep pace with the rapid evolution of cloud solutions, resulting in the accumulation of irrelevant events or inaccurate data. Furthermore, integrating SIEM solutions with both on-premises and cloud-based systems increases complexity and cost, as noted by Mohamed El Bagory, a SIEM Technical Instructor at LogRhythm. Notwithstanding, El Bagory acknowledged the vast potential of cloud data for SIEM solutions, emphasizing the need to explore beyond basic information from SSH logins and Chrome tabs to include data from command lines and process statistics. 2. IoT Device Logs As Praksch rightly said, any IT or OT technology that creates logs or reports about its operation is already used for security purposes. This is because IoT devices are known to generate a wealth of rich data about their operations, interactions, and environments. IoT devices, renowned for producing diverse data types such as logs, telemetry, and alerts, are considered a SIEM solutions’s favorite data source. This data diversity allows SIEM solutions to analyze different aspects of the network and identify anomalies or suspicious behavior. Conclusion In conclusion, as Praksch rightly said, "The more data a SIEM solution can work with, the higher its chances of successfully monitoring an organization's environment against cyber threats." So, while most SIEM data sources date back to the inception of the technology, they have gone through several evolution stages to make sure they are extracting accurate and meaningful data for threat detection.
In this post, you will take a closer look at embedding documents to be used for a semantic search. By means of examples, you will learn how embedding influences the search result and how you can improve the results. Enjoy! Introduction In a previous post, a chat with documents using LangChain4j and LocalAI was discussed. One of the conclusions was that the document format has a large influence on the results. In this post, you will take a closer look at the influence of source data and the way it is embedded in order to get a better search result. The source documents are two Wikipedia documents. You will use the discography and list of songs recorded by Bruce Springsteen. The interesting part of these documents is that they contain facts and are mainly in a table format. The same documents were used in the previous post, so it will be interesting to see how the findings from that post compare to the approach used in this post. This blog can be read without reading the previous blogs if you are familiar with the concepts used. If not, it is recommended to read the previous blogs as mentioned in the prerequisites paragraph. The sources used in this blog can be found on GitHub. Prerequisites The prerequisites for this blog are: Basic knowledge of embedding and vector stores Basic Java knowledge: Java 21 is used Basic knowledge of LangChain4j - see the previous blogs: How to Use LangChain4j With LocalAI LangChain4j: Chat With Documents You need LocalAI if you want to run the examples at the end of this blog. See a previous blog on how you can make use of LocalAI. Version 2.2.0 is used for this blog. Embed Whole Document The easiest way to embed a document is to read the document, split it into chunks, and embed the chunks. Embedding means transforming the text into vectors (numbers). The question you will ask also needs to be embedded. The vectors are stored in a vector store which is able to find the results that are the closest to your question and will respond with these results. The source code consists of the following parts: The text needs to be embedded. An embedding model is needed for that; for simplicity, use the AllMiniLmL6V2EmbeddingModel. This model uses the BERT model, which is a popular embedding model. The embeddings need to be stored in an embedding store. Often, a vector database is used for this purpose; but in this case, you can use an in-memory embedding store. Read the two documents and add them to a DocumentSplitter. Here you will define to split the documents into chunks of 500 characters with no overlap. By means of the DocumentSplitter, the documents are split into TextSegments. The embedding model is used to embed the TextSegments. The TextSegments and their embedded counterpart are stored in the embedding store. The question is also embedded with the same model. Ask the embedding store to find relevant embedded segments to the embedded question. You can define how many results the store should retrieve. In this case, only one result is asked for. If a match is found, the following information is printed to the console: The score: A number indicating how well the result corresponds to the question The original text: The text of the segment The metadata: Will show you the document the segment comes from Java private static void askQuestion(String question) { EmbeddingModel embeddingModel = new AllMiniLmL6V2EmbeddingModel(); EmbeddingStore<TextSegment> embeddingStore = new InMemoryEmbeddingStore<>(); // Read and split the documents in segments of 500 chunks Document springsteenDiscography = loadDocument(toPath("example-files/Bruce_Springsteen_discography.pdf")); Document springsteenSongList = loadDocument(toPath("example-files/List_of_songs_recorded_by_Bruce_Springsteen.pdf")); ArrayList<Document> documents = new ArrayList<>(); documents.add(springsteenDiscography); documents.add(springsteenSongList); DocumentSplitter documentSplitter = DocumentSplitters.recursive(500, 0); List<TextSegment> documentSegments = documentSplitter.splitAll(documents); // Embed the segments Response<List<Embedding>> embeddings = embeddingModel.embedAll(documentSegments); embeddingStore.addAll(embeddings.content(), documentSegments); // Embed the question and find relevant segments Embedding queryEmbedding = embeddingModel.embed(question).content(); List<EmbeddingMatch<TextSegment>> embeddingMatch = embeddingStore.findRelevant(queryEmbedding,1); System.out.println(embeddingMatch.get(0).score()); System.out.println(embeddingMatch.get(0).embedded().text()); System.out.println(embeddingMatch.get(0).embedded().metadata()); } The questions are the following, and are some facts that can be found in the documents: Java public static void main(String[] args) { askQuestion("on which album was \"adam raised a cain\" originally released?"); askQuestion("what is the highest chart position of \"Greetings from Asbury Park, N.J.\" in the US?"); askQuestion("what is the highest chart position of the album \"tracks\" in canada?"); askQuestion("in which year was \"Highway Patrolman\" released?"); askQuestion("who produced \"all or nothin' at all?\""); } Question 1 The following is the result for question 1: "On which album was 'Adam Raised a Cain' originally released?" Shell 0.6794537224516205 Jim Cretecos 1973 [14] "57 Channels (And Nothin' On)" Bruce Springsteen Human Touch Jon Landau Chuck Plotkin Bruce Springsteen Roy Bittan 1992 [15] "7 Rooms of Gloom" (Four Tops cover) Holland–Dozier– Holland † Only the Strong Survive Ron Aniello Bruce Springsteen 2022 [16] "Across the Border" Bruce Springsteen The Ghost of Tom Joad Chuck Plotkin Bruce Springsteen 1995 [17] "Adam Raised a Cain" Bruce Springsteen Darkness on the Edge of Town Jon Landau Bruce Springsteen Steven Van Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/example-files, index=4, file_name=List_of_songs_recorded_by_Bruce_Springsteen.pdf, document_type=PDF} } What do you see here? The score is 0.679…: This means that the segment matches 67.9% of the question. The segment itself contains the specified information at Line 27. The correct segment is chosen - this is great. The metadata shows the document where the segment comes from. You also see how the table is transformed into a text segment: it isn’t a table anymore. In the source document, the information is formatted as follows: Another thing to notice is where the text segment is split. So, if you had asked who produced this song, it would be an incomplete answer, because this row is split in column 4. Question 2 The following is the result for question 2: "What is the highest chart position of 'Greetings from Asbury Park, NJ' in the US?" Shell 0.6892728817378977 29. Greetings from Asbury Park, N.J. (LP liner notes). Bruce Springsteen. US: Columbia Records. 1973. KC 31903. 30. Nebraska (LP liner notes). Bruce Springsteen. US: Columbia Records. 1982. TC 38358. 31. Chapter and Verse (CD booklet). Bruce Springsteen. US: Columbia Records. 2016. 88985 35820 2. 32. Born to Run (LP liner notes). Bruce Springsteen. US: Columbia Records. 1975. PC 33795. 33. Tracks (CD box set liner notes). Bruce Springsteen. Europe: Columbia Records. 1998. COL 492605 2 2. Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/example-files, index=100, file_name=List_of_songs_recorded_by_Bruce_Springsteen.pdf, document_type=PDF} } The information is found in the correct document, but the wrong text segment is found. This segment comes from the References section and you needed the information from the Songs table, just like for question 1. Question 3 The following is the result for question 3: "What is the highest chart position of the album 'Tracks' in Canada?" Shell 0.807258199400863 56. @billboardcharts (November 29, 2021). "Debuts on this week's #Billboard200 (1/2)..." (https://twitter.com/bil lboardcharts/status/1465346016702566400) (Tweet). Retrieved November 30, 2021 – via Twitter. 57. "ARIA Top 50 Albums Chart" (https://www.aria.com.au/charts/albums-chart/2021-11-29). Australian Recording Industry Association. November 29, 2021. Retrieved November 26, 2021. 58. "Billboard Canadian Albums" (https://www.fyimusicnews.ca/fyi-charts/billboard-canadian-albums). Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/example-files, index=142, file_name=Bruce_Springsteen_discography.pdf, document_type=PDF} } The information is found in the correct document, but also here, the segment comes from the References section, while the answer to the question can be found in the Compilation albums table. This can explain some of the wrong answers that were given in the previous post. Question 4 The following is the result for question 4: "In which year was 'Highway Patrolman' released?" Shell 0.6867325432140559 "Highway 29" Bruce Springsteen The Ghost of Tom Joad Chuck Plotkin Bruce Springsteen 1995 [17] "Highway Patrolman" Bruce Springsteen Nebraska Bruce Springsteen 1982 [30] "Hitch Hikin' " Bruce Springsteen Western Stars Ron Aniello Bruce Springsteen 2019 [53] "The Hitter" Bruce Springsteen Devils & Dust Brendan O'Brien Chuck Plotkin Bruce Springsteen 2005 [24] "The Honeymooners" Bruce Springsteen Tracks Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt 1998 [33] [76] "House of a Thousand Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/example-files, index=31, file_name=List_of_songs_recorded_by_Bruce_Springsteen.pdf, document_type=PDF} } The information is found in the correct document and the correct segment is found. However, it is difficult to retrieve the correct answer because of the formatting of the text segment, and you do not have any context about what the information represents. The column headers are gone, so how should you know that 1982 is the answer to the question? Question 5 The following is the result for question 5: "Who produced 'All or Nothin’ at All'?" Shell 0.7036564758755796 Zandt (assistant) 1978 [18] "Addicted to Romance" Bruce Springsteen She Came to Me (soundtrack) Bryce Dessner 2023 [19] [20] "Ain't Good Enough for You" Bruce Springsteen The Promise Jon Landau Bruce Springsteen 2010 [21] [22] "Ain't Got You" Bruce Springsteen Tunnel of Love Jon Landau Chuck Plotkin Bruce Springsteen 1987 [23] "All I'm Thinkin' About" Bruce Springsteen Devils & Dust Brendan O'Brien Chuck Plotkin Bruce Springsteen 2005 [24] "All or Nothin' at All" Bruce Springsteen Human Touch Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/example-files, index=5, file_name=List_of_songs_recorded_by_Bruce_Springsteen.pdf, document_type=PDF} } The information is found in the correct document, but again, the segment is split in the row where the answer can be found. This can explain the incomplete answers that were given in the previous post. Conclusion Two answers are correct, one is partially correct, and two are wrong. Embed Markdown Document What would change when you convert the PDF documents into Markdown files? Tables are probably better to recognize in Markdown files than in PDF documents, and they allow you to segment the document at the row level instead of some arbitrary chunk size. Only the parts of the documents that contain the answers to the questions are converted; this means the Studio albums and Compilation albums from the discography and the List of songs recorded. The segmenting is done as follows: Split the document line per line. Retrieve the data of the table in the variable dataOnly. Save the header of the table in the variable header. Create a TextSegment for every row in dataOnly and add the header to the segment. The source code is as follows: Java List<Document> documents = loadDocuments(toPath("markdown-files")); List<TextSegment> segments = new ArrayList<>(); for (Document document : documents) { String[] splittedDocument = document.text().split("\n"); String[] dataOnly = Arrays.copyOfRange(splittedDocument, 2, splittedDocument.length); String header = splittedDocument[0] + "\n" + splittedDocument[1] + "\n"; for (String splittedLine : dataOnly) { segments.add(TextSegment.from(header + splittedLine, document.metadata())); } } Question 1 The following is the result for question 1: "On which album was 'Adam Raised a Cain' originally released?" Shell 0.6196628642947255 | Title |Album details| US | AUS | GER | IRE | NLD |NZ |NOR|SWE|UK |-----------------------------------------------|-------------|---|---|---|---|---|---|---|---|---| |The Essential Bruce Springsteen|14|41|—|—|5|22|—|4|2|15| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_discography_compilation_albums.md, document_type=UNKNOWN} } The answer is incorrect. Question 2 The following is the result for question 2: "What is the highest chart position of 'Greetings from Asbury Park, NJ' in the US?" Shell 0.8229951885990189 | Title |Album details| US | AUS | GER | IRE | NLD |NZ |NOR|SWE|UK |-----------------------------------------------|-------------|---|---|---|---|---|---|---|---|---| | Greetings from Asbury Park,N.J. |60|71|—|—|—|—|—|—|35|41| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_discography_studio_albums.md, document_type=UNKNOWN} } The answer is correct, and the answer can easily be retrieved, as you have the header information for every column. Question 3 The following is the result for question 3: "What is the highest chart position of the album 'Tracks' in Canada?" Shell 0.7646818618182345 | Title |Album details| US | AUS | GER | IRE | NLD |NZ |NOR|SWE|UK |-----------------------------------------------|-------------|---|---|---|---|---|---|---|---|---| |Tracks|27|97|—|63|—|36|—|4|11|50| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_discography_compilation_albums.md, document_type=UNKNOWN} } The answer is correct. Question 4 The following is the result for question 4: "In which year was 'Highway Patrolman' released?" Shell 0.6108392657222184 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Working on the Highway" |Bruce Springsteen| Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt |1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } The answer is incorrect. The correct document is found, but the wrong segment is chosen. Question 5 The following is the result for question 5: "Who produced 'All or Nothin’ at All'?" Shell 0.6724577751120745 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| | "All or Nothin' at All" | Bruce Springsteen | Human Touch | Jon Landau Chuck Plotkin Bruce Springsteen Roy Bittan |1992 | Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } The answer is correct and complete this time. Conclusion Three answers are correct and complete. Two answers are incorrect. Note that the incorrect answers are for different questions as before. However, the result is slightly better than with the PDF files. Alternative Questions Let’s build upon this a bit further. You are not using a Large Language Model (LLM) here, which will help you with textual differences between the questions you ask and the interpretation of results. Maybe it helps when you change the question in order to use terminology that is closer to the data in the documents. The source code can be found here. Question 1 Let’s change question 1 from "On which album was 'Adam Raised a Cain' originally released?" to "What is the original release of 'Adam Raised a Cain'?". The column in the table is named original release, so that might make a difference. The result is the following: Shell 0.6370094541277747 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| | "Adam Raised a Cain" | Bruce Springsteen | Darkness on the Edge of Town | Jon Landau Bruce Springsteen Steven Van Zandt (assistant) | 1978| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } The answer is correct this time and the score is slightly higher. Question 4: Attempt #1 Question 4 is, "In which year was 'Highway Patrolman' released?" Remember that you only asked for the first relevant result. However, more relevant results can be displayed. Set the maximum number of results to 5. Java List<EmbeddingMatch<TextSegment>> relevantMatches = embeddingStore.findRelevant(queryEmbedding,5); The result is: Shell 0.6108392657222184 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Working on the Highway" |Bruce Springsteen| Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt |1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6076896858171996 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Turn! Turn! Turn!" (with Roger McGuinn) | Pete Seeger † | Magic Tour Highlights (EP) | John Cooper | 2008| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6029946650419344 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Darlington County" | Bruce Springsteen | Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt | 1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6001672430441461 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Downbound Train" | Bruce Springsteen | Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt |1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.5982557901838741 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Highway Patrolman" | Bruce Springsteen | Nebraska | Bruce Springsteen | 1982| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } As you can see, Highway Patrolman is a result, but only the fifth result. That is a bit strange, though. Question 4: Attempt #2 Let’s change question 4 from, "In which year was 'Highway Patrolman' released?" to, "In which year was the song 'Highway Patrolman' released?" So, you add "the song" to the question. The result is: Shell 0.6506125707025556 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Working on the Highway" |Bruce Springsteen| Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt |1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.641000538311824 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Raise Your Hand" (live) (Eddie Floyd cover) | Steve Cropper Eddie Floyd Alvertis Isbell † | Live 1975–85 | Jon Landau Chuck Plotkin Bruce Springsteen |1986 | Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6402738046796352 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Darlington County" | Bruce Springsteen | Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt | 1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6362427185719677 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Highway Patrolman" | Bruce Springsteen | Nebraska | Bruce Springsteen | 1982| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.635837703599965 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Wreck on the Highway"| Bruce Springsteen |The River | Jon Landau Bruce Springsteen Steven Van Zandt |1980 | Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } Now Highway Patrolman is the fourth result. It is getting better. Question 4: Attempt #3 Let’s add the words "of the album Nebraska" to question 4. The question becomes, "In which year was the song 'Highway Patrolman' of the album Nebraska released?" The result is: Shell 0.6468954949440158 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Working on the Highway" |Bruce Springsteen| Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt |1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6444919056791143 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Darlington County" | Bruce Springsteen | Born in the U.S.A. | Jon Landau Chuck Plotkin Bruce Springsteen Steven Van Zandt | 1984| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6376680100362238 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Highway Patrolman" | Bruce Springsteen | Nebraska | Bruce Springsteen | 1982| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } 0.6367565537138745 | Title |Album details| US | AUS | GER | IRE | NLD |NZ |NOR|SWE|UK |-----------------------------------------------|-------------|---|---|---|---|---|---|---|---|---| |The Essential Bruce Springsteen|14|41|—|—|5|22|—|4|2|15| Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_discography_compilation_albums.md, document_type=UNKNOWN} } 0.6364950606665447 | song | writer(s) | original release | Producer(s) |year| |-----------------------------------------------------------------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------|-----------------------------------------------------------------------|-| |"Raise Your Hand" (live) (Eddie Floyd cover) | Steve Cropper Eddie Floyd Alvertis Isbell † | Live 1975–85 | Jon Landau Chuck Plotkin Bruce Springsteen |1986 | Metadata { metadata = {absolute_directory_path=/<project directory>/mylangchain4jplanet/target/classes/markdown-files, file_name=bruce_springsteen_list_of_songs_recorded.md, document_type=UNKNOWN} } Again, an improvement: Highway Patrolman is now listed as the third result. Still, it is strange why it is not listed as the first result. However, by adding more information, it ranks higher in the result list. This is as expected. Conclusion Changing the question with terminology that is closer to the source data helps in order to get a better result. Adding more context to the question also helps. Displaying more results gives you more insight and lets you determine the correct answer from the result list. Markdown Embedding Combined With LLM The conclusions up until now are: The format of the documents and the way the documents are segmented and embedded have a significant influence on the results. If the question uses terminology that is close to the data in the documents, better results are achieved. In the previous blog, the PDF files were used as source data. This is now solved by using Markdown and better segmenting and embedding. The second bullet above can probably be solved by using an LLM. Let’s see what happens when you combine the embedding in this post with an LLM from the previous post. The full source code can be found on GitHub. The questions and results are: Q: On which album was “Adam Raised a Cain” originally released?A: The album “Adam Raised a Cain” was originally released on The Essential Bruce Springsteen album in 1995. This answer is wrong but corresponds to the first embedded result earlier. Q: What is the highest chart position of “Greetings from Asbury Park, NJ” in the US?A: The highest chart position of “Greetings from Asbury Park, NJ” in the US is 60. This answer is correct. Q: What is the highest chart position of the album “Tracks” in Canada?A: The album “Tracks” by Metallica has not reached the highest chart position in Canada. According to the provided information, the highest chart position it reached in Canada was #27. This answer is wrong. The first embedded result earlier was correct, so it is strange that the wrong answer is given here. Q: In which year was “Highway Patrolman” released?A: The song “Highway Patrolman” was released in 1984. The answer is wrong, but it can be explained. The first embedded result earlier showed the result for Working on the Highway, and this song was released in 1984. Q: In which year was “Higway Patrolman” released?A: The song “Highway Patrolman” was released in 1984. The question here included a typo. The result is similar to the previous one, so this is consistent. Q: Who produced “All or Nothin’ at All”?A: The song “All or Nothin’ at All” was produced by Bruce Springsteen, Jon Landau, Chuck Plotkin, and Roy Bittan. It was originally released on Springsteen’s 1992 album “Human Touch.” This answer is correct and complete. With the PDF as source data, the answer was always incomplete. The results are worse than using the PDF files as source documents. However, when a correct answer is given, it is complete. The answers are also consistent; a minor typo does not change the answer like it did before. Markdown Embedding Combined With System Messages In the previous blog, the final solution also included adding system messages to the chat memory. Let’s see what happens when you combine this with Markdown source documents. The full source code can be found on GitHub. Below is a small extract of the system messages. Java ArrayList<ChatMessage> chatMessages = new ArrayList<>(); chatMessages.add(new SystemMessage("Use the provided documents to answer the questions.")); chatMessages.add(new SystemMessage("If the answer cannot be found in the documents, write \"I could not find an answer\".")); The questions and results are: Q: On which album was “Adam Raised a Cain” originally released?A: The album “Adam Raised a Cain” was originally released on The Essential Bruce Springsteen in 1995. This answer is wrong and similar to the previous paragraph. Q: What is the highest chart position of “Greetings from Asbury Park, NJ” in the US?A: Based on the provided information, the highest chart position of “Greetings from Asbury Park, N.J.” in the US is 60. This answer is correct and similar to the previous paragraph. Q: What is the highest chart position of the album “Tracks” in Canada?A: Based on the information provided, the highest chart position of the album “Tracks” in Canada is 64. This answer is still wrong. However, Metallica is not mentioned anymore, so it is a bit less wrong than in the previous paragraph. Q: In which year was “Highway Patrolman” released?A: The song “Highway Patrolman” was released in 1984. The answer is wrong and identical to the previous paragraph. Q: In which year was “Higway Patrolman” released?A: The song “Highway Patrolman” was released in 1984. The answer is wrong and identical to the previous paragraph. Q: Who produced “All or Nothin’ at All”?A: The song “All or Nothin’ at All” was produced by Bruce Springsteen, Jon Landau, Chuck Plotkin, and Roy Bittan. It was originally released on the album “Human Touch” in 1992. This answer is correct, complete, and similar to the previous paragraph. Adding system messages did not have any influence on the results. Overall Conclusion What did you learn from all of this? The way documents are read and embedded seems to have the largest influence on the result. An advantage of this approach is that you are able to display a number of results. This allows you to determine which result is the correct one. Changing your question in order to use the terminology used in the text segments helps to get a better result. Querying a vector store is very fast. Embedding costs some time, but you only need to do this once. Using an LLM takes a lot more time to retrieve a result when you do not use a GPU. An interesting resource to read is Deconstructing RAG, a blog from LangChain. When improvements are made in this area, better results will be the consequence.
Kai Wähner
Technology Evangelist,
Confluent
Gilad David Maayan
CEO,
Agile SEO
Fawaz Ghali, PhD
Principal Data Architect and Head of Developer Relations,
Hazelcast