Welcome to the Data Engineering category of DZone, where you will find all the information you need for AI/ML, big data, data, databases, and IoT. As you determine the first steps for new systems or reevaluate existing ones, you're going to require tools and resources to gather, store, and analyze data. The Zones within our Data Engineering category contain resources that will help you expertly navigate through the SDLC Analysis stage.
Artificial intelligence (AI) and machine learning (ML) are two fields that work together to create computer systems capable of perception, recognition, decision-making, and translation. Separately, AI is the ability for a computer system to mimic human intelligence through math and logic, and ML builds off AI by developing methods that "learn" through experience and do not require instruction. In the AI/ML Zone, you'll find resources ranging from tutorials to use cases that will help you navigate this rapidly growing field.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
A database is a collection of structured data that is stored in a computer system, and it can be hosted on-premises or in the cloud. As databases are designed to enable easy access to data, our resources are compiled here for smooth browsing of everything you need to know from database management systems to database languages.
IoT, or the Internet of Things, is a technological field that makes it possible for users to connect devices and systems and exchange data over the internet. Through DZone's IoT resources, you'll learn about smart devices, sensors, networks, edge computing, and many other technologies — including those that are now part of the average person's daily life.
Enterprise AI
In recent years, artificial intelligence has become less of a buzzword and more of an adopted process across the enterprise. With that, there is a growing need to increase operational efficiency as customer demands arise. AI platforms have become increasingly more sophisticated, and there has become the need to establish guidelines and ownership. In DZone’s 2022 Enterprise AI Trend Report, we explore MLOps, explainability, and how to select the best AI platform for your business. We also share a tutorial on how to create a machine learning service using Spring Boot, and how to deploy AI with an event-driven platform. The goal of this Trend Report is to better inform the developer audience on practical tools and design paradigms, new technologies, and the overall operational impact of AI within the business. This is a technology space that's constantly shifting and evolving. As part of our December 2022 re-launch, we've added new articles pertaining to knowledge graphs, a solutions directory for popular AI tools, and more.
In a recent survey by Great Expectations, 91% of respondents revealed that data quality issues had some level of impact on their organization. It highlights the critical importance of data quality in data engineering pipelines. Organizations can avoid costly mistakes, make better decisions, and ultimately drive better business outcomes by ensuring that data is accurate, consistent, and reliable. However, 41% of respondents in the survey also reported that lack of tooling was a major contributing factor to data quality issues. Employing data quality management tools in data pipelines can automate various processes required to ensure that the data remains fit for purpose across analytics, data science, and machine learning use cases. They also assess existing data pipelines, identify quality bottlenecks, and automate various remediation steps. To help organizations find the best tools, this article lists some popular tools for automating data quality checks in data engineering pipelines. Importance of Data Quality Check-In Data Engineering Pipelines Data quality tools are as essential as other data engineering tools, such as integration, warehousing, processing, storage, governance, and security. Here are several reasons why data quality check is essential in data pipelines: Accuracy: It ensures that the data is accurate and error-free. This is crucial for making informed decisions based on the data. If the data is inaccurate, it can lead to incorrect conclusions and poor business decisions. Completeness: It ensures that all required data is present in the pipeline and the pipeline is free from duplicate data. Incomplete data can result in missing insights, leading to incorrect or incomplete analysis. Consistency: Data quality check ensures consistency across different sources and pipelines. Inconsistent data can lead to discrepancies in the analysis and affect the overall reliability of the data. Compliance: It ensures the data complies with regulatory requirements and industry standards. Non-compliance can result in legal and financial consequences. Efficiency: Data quality checks help identify and fix data issues early in the pipeline, reducing the time and effort required for downstream processing and analysis. The data quality checks in the ingestion, storage, ETL, and processing layers are usually similar, regardless of the business needs and differing industries. The goal is to ensure that data is not lost or degraded while moving from source to target systems. Why Automate? Here’s how automating data testing and data quality checks can enhance the performance of data engineering pipelines: By testing data at every pipeline stage with automation, data engineers can identify and address issues early, preventing errors and data quality issues from being propagated downstream. Automation saves time and reduces the manual effort required to validate data. This, in turn, speeds up the development cycle and enables faster time-to-market. Automation tools can automate repetitive tasks such as data validation, reducing the time and effort required to perform these tasks manually. It increases the efficiency of the data engineering pipeline and allows data engineers to focus on more complex tasks. Data engineers can ensure that their pipelines and storage comply with regulatory and legal requirements and avoid costly penalties by automatically testing for data privacy, security, and compliance issues. Detecting errors early through automated checks reduces the risk of data processing errors and data quality issues. This saves time, money, and resources that would otherwise be spent on fixing issues downstream. List of Top Tools to Automate Data Quality Check Each data quality management tool has its own set of capabilities and workflows for automation. Most tools include features for data profiling, cleansing, tracking data lineage, and standardizing data. Some may also have parsing and monitoring capabilities or more. Here are some popular tools with their features: 1. Great Expectations Great Expectations provides a flexible way to define, manage, and automate data quality checks in data engineering pipelines. It supports various data sources, including SQL, Pandas, Spark, and more. Key features: Mechanisms for a shared understanding of data. Faster data discovery Integrates with your existing stack. Essential security and governance. Integrates with other data engineering tools such as AWS Glue, Snowflake, BigQuery, etc. Pricing: Open-source Popular companies using it: Moody’s Analytics, Calm, CarNext.com 2. IBM InfoSphere Information Server for Data Quality IBM InfoSphere Information Server for Data Quality offers end-to-end data quality tools for data cleansing, automating source data investigation, data standardization, validation, and more. It also enables you to continuously monitor and analyze data quality to prevent incorrect and inconsistent data. Key features: Designed to be scalable and handle large volumes of data across distributed environments. Offers flexible deployment options. Helps maintain data lineage. Supports various data sources and integration with other IBM data management products. Pricing: Varied pricing Popular companies using it: Toyota, Mastercard, UPS 3. Apache Airflow Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It provides features like task dependencies, retries, and backfills to automate data engineering pipelines and can be used for performing data quality checks as well. Key features: Modular architecture that can scale to infinity. Defined in Python, which allows for dynamic pipeline generation. Robust integrations with many third-party services, including AWS, Azure, GCP, and other next-gen technologies. Pricing: Open-source Popular companies using it: Airbnb, PayPal, Slack 4. Apache Nifi Apache Nifi provides a visual interface for designing and automating data engineering pipelines. It has built-in processors for performing data quality checks, such as validating data schema, checking for null values, and ensuring data completeness. Key features: Browser-based UI Data provenance Extensible architecture Supports powerful and scalable directed graphs (DAGs) of data routing, transformation, and system mediation logic. Pricing: Open-source Popular companies using it: Adobe, Capital One, The Weather Company 5. Talend Talend is a comprehensive platform that provides data quality solutions for data profiling, cleansing, enrichment, and standardization across your systems. It supports various data sources, including databases, files, and cloud-based platforms. Key features: Intuitive UI ML-powered recommendations to address data quality issues. Real-time capabilities Automates better data Pricing: Varied pricing plans Popular companies using it: Beneva, Air France, Allianz 6. Informatica Data Quality Informatica Data Quality is an enterprise-level data quality tool with data profiling, cleansing, and validation features. It also provides other capabilities such as data de-duplication, enrichment, and consolidation. Key features: Reliable data quality powered by AI. Reusability (of rules and accelerators) to save time and resources. Exception management through an automated process. Pricing: IPU (Informatica Processing Unit) pricing Popular companies using it: Lowell, L.A. Care, HSB Conclusion The above is not a definitive list. There are many other popular tools, such as Precisely Trillium, Ataccama One, SAS Data Quality, etc. Choosing the right data engineering tools for a pipeline involves considering several factors. It involves understanding your data pipeline and quality requirements, evaluating available tools, and their automation capabilities, considering cost and ROI, the ability to integrate with your current stack, and testing the tool with your pipeline.
To get more clarity about ISR in Apache Kafka, we should first carefully examine the replication process in the Kafka broker. In short, replication means having multiple copies of our data spread across multiple brokers. Maintaining the same copies of data in different brokers makes possible the high availability in case one or more brokers go down or are untraceable in a multi-node Kafka cluster to server the requests. Because of this reason, it is mandatory to mention how many copies of data we want to maintain in the multi-node Kafka cluster while creating a topic. It is termed a replication factor, and that’s why it can’t be more than one while creating a topic on a single-node Kafka cluster. The number of replicas specified while creating a topic can be changed in the future based on node availability in the cluster. On a single-node Kafka cluster, however, we can have more than one partition in the broker because each topic can have one or more partitions. The Partitions are nothing but sub-divisions of the topic into multiple parts across all the brokers on the cluster, and each partition would hold the actual data(messages). Internally, each partition is a single log file upon which records are written in an append-only fashion. Based on the provided number, the topic internally split into the number of partitions at the time of creation. Thanks to partitioning, messages can be distributed in parallel among several brokers in the cluster. Kafka scales to accommodate several consumers and producers at once by employing this parallelism technique. This partitioning technique enables linear scaling for both consumers and providers. Even though more partitions in a Kafka cluster provide a higher throughput but with more partitions, there are pitfalls too. Briefly, more file handlers would be created if we increase the number of partitions as each partition maps to a directory in the file system in the broker. Now it would be easy for us to understand better the ISR as we have discussed replication and partitions of Apache Kafka above. The ISR is just a partition’s replicas that are “in sync” with the leader, and the leader is nothing but a replica that all requests from clients and other brokers of Kafka go to it. Other replicas that are not the leader are termed followers. A follower that is in sync with the leader is called an ISR (in-sync replica). For example, if we set the topic’s replication factor to 3, Kafka will store the topic-partition log in three different places and will only consider a record to be committed once all three of these replicas have verified that they have written the record to the disc successfully and eventually send back the acknowledgment to the leader. In a multi-broker (multi-node) Kafka cluster (please click here to read how a multi-node Kafka cluster can be created), one broker is selected as the leader to serve the other brokers, and this leader broker would be responsible to handle all the read and write requests for a partition while the followers (other brokers) passively replicate the leader to achieve the data consistency. Each partition can only have one leader at a time and handles all reads and writes of records for that partition. The Followers replicate leaders and take over if the leader dies. By leveraging Apache Zookeeper, Kafka internally selects the replica of one broker’s partition, and if the leader of that partition fails (due to an outage of that broker), Kafka chooses a new ISR (in-sync replica) as the new leader. When all of the ISRs for a partition write to their log, the record is said to have been “committed,” and the consumer can only read committed records. The minimum in-sync replica count specifies the minimum number of replicas that must be present for the producer to successfully send records to a partition. Even though the high number of minimum in-sync replicas gives a higher persistence but there might be a repulsive effect, too, in terms of availability. The data availability automatically gets reduced if the minimum number of in-sync replicas won’t be available before publishing. The minimum number of in-sync replicas indicates how many replicas must be available for the producer to send records to a partition successfully. For example, if we have a three-node operational Kafka cluster with minimum in-sync replicas configuration as three, and subsequently, if one node goes down or unreachable, then the rest other two nodes will not be able to receive any data/messages from the producers because of only two active/available in sync replicas across the brokers. The third replica, which existed on the dead or unavailable broker, won’t be able to send the acknowledgment to the leader that it was synced with the latest data like how the other two live replicas did on the available brokers in the cluster. Hope you have enjoyed this read. Please like and share if you feel this composition is valuable.
Exploratory Data Analysis (EDA) is an essential step in any data science project, as it allows us to understand the data, detect patterns, and identify potential issues. In this article, we will explore how to use two popular Python libraries, Pandas and Matplotlib, to perform EDA. Pandas is a powerful library for data manipulation and analysis, while Matplotlib is a versatile library for data visualization. We will cover the basics of loading data into a pandas DataFrame, exploring the data using pandas functions, cleaning the data, and finally, visualizing the data using Matplotlib. By the end of this article, you will have a solid understanding of how to use Pandas and Matplotlib to perform EDA in Python. Importing Libraries and Data Importing Libraries To use the pandas and Matplotlib libraries in your Python code, you need to first import them. You can do this using the import statement followed by the name of the library. Python python import pandas as pd import matplotlib.pyplot as plt In this example, we're importing pandas and aliasing it as 'pd', which is a common convention in the data science community. We're also importing matplotlib.pyplot and aliasing it as 'plt'. By importing these libraries, we can use their functions and methods to work with data and create visualizations. Loading Data Once you've imported the necessary libraries, you can load the data into a pandas DataFrame. Pandas provides several methods to load data from various file formats, including CSV, Excel, JSON, and more. The most common method is read_csv, which reads data from a CSV file and returns a DataFrame. Python python# Load data into a pandas DataFrame data = pd.read_csv('path/to/data.csv') In this example, we're loading data from a CSV file located at 'path/to/data.csv' and storing it in a variable called 'data'. You can replace 'path/to/data.csv' with the actual path to your data file. By loading data into a pandas DataFrame, we can easily manipulate and analyze the data using pandas' functions and methods. The DataFrame is a 2-dimensional table-like data structure that allows us to work with data in a structured and organized way. It provides functions for selecting, filtering, grouping, aggregating, and visualizing data. Data Exploration head() and tail() The head() and tail() functions are used to view the first few and last few rows of the data, respectively. By default, these functions display the first/last five rows of the data, but you can specify a different number of rows as an argument. Python python# View the first 5 rows of the data print(data.head()) # View the last 10 rows of the data print(data.tail(10)) info() The info() function provides information about the DataFrame, including the number of rows and columns, the data types of each column, and the number of non-null values. This function is useful for identifying missing values and determining the appropriate data types for each column. Python python# Get information about the data print(data.info()) describe() The describe() function provides summary statistics for numerical columns in the DataFrame, including the count, mean, standard deviation, minimum, maximum, and quartiles. This function is useful for getting a quick overview of the distribution of the data. Python python# Get summary statistics for the data print(data.describe()) value_counts() The value_counts() function is used to count the number of occurrences of each unique value in a column. This function is useful for identifying the frequency of specific values in the data. Python python# Count the number of unique values in a column print(data['column_name'].value_counts()) These are just a few examples of panda functions you can use to explore data. There are many other functions you can use depending on your specific data exploration needs, such as isnull() to check for missing values, groupby() to group data by a specific column, corr() to calculate correlation coefficients between columns and more. Data Cleaning isnull() The isnull() function is used to check for missing or null values in the DataFrame. It returns a DataFrame of the same shape as the original, with True values where the data is missing and False values where the data is present. You can use the sum() function to count the number of missing values in each column. Python python# Check for missing values print(data.isnull().sum()) dropna() The dropna() function is used to remove rows or columns with missing or null values. By default, this function removes any row that contains at least one missing value. You can use the subset argument to specify which columns to check for missing values and the how argument to specify whether to drop rows with any missing values or only rows where all values are missing. Python python# Drop rows with missing values data = data.dropna() drop_duplicates() The drop_duplicates() function is used to remove duplicate rows from the DataFrame. By default, this function removes all rows that have the same values in all columns. You can use the subset argument to specify which columns to check for duplicates. Python python# Drop duplicate rows data = data.drop_duplicates() replace() The replace() function is used to replace values in a column with new values. You can specify the old value to replace and the new value to replace it. This function is useful for handling data quality issues such as misspellings or inconsistent formatting. Python python# Replace values in a column data['column_name'] = data['column_name'].replace('old_value', 'new_value') These are just a few examples of pandas functions you can use to clean data. There are many other functions you can use depending on your specific data-cleaning needs, such as fillna() to fill missing values with a specific value or method, astype() to convert data types of columns, clip() to trim outliers and more. Data cleaning plays a crucial role in preparing data for analysis, and automating the process can save time and ensure data quality. In addition to the panda's functions mentioned earlier, automation techniques can be applied to streamline data-cleaning workflows. For instance, you can create reusable functions or pipelines to handle missing values, drop duplicates, and replace values across multiple datasets. Moreover, you can leverage advanced techniques like imputation to fill in missing values intelligently or regular expressions to identify and correct inconsistent formatting. By combining the power of pandas functions with automation strategies, you can efficiently clean and standardize data, improving the reliability and accuracy of your exploratory data analysis (EDA). Data Visualization Data visualization is a critical component of data science, as it allows us to gain insights from data quickly and easily. Matplotlib is a popular Python library for creating a wide range of data visualizations, including scatter plots, line plots, bar charts, histograms, box plots, and more. Here are a few examples of how to create these types of visualizations using Matplotlib: Scatter Plot A scatter plot is used to visualize the relationship between two continuous variables. You can create a scatter plot in Matplotlib using the scatter() function. Python python# Create a scatter plot plt.scatter(data['column1'], data['column2']) plt.xlabel('Column 1') plt.ylabel('Column 2') plt.show() In this example, we're creating a scatter plot with column1 on the x-axis and column2 on the y-axis. We're also adding labels to the x-axis and y-axis using the xlabel() and ylabel() functions. Histogram A histogram is used to visualize the distribution of a single continuous variable. You can create a histogram in Matplotlib using the hist() function. Python python# Create a histogram plt.hist(data['column'], bins=10) plt.xlabel('Column') plt.ylabel('Frequency') plt.show() In this example, we're creating a histogram of the column variable with 10 bins. We're also adding labels to the x-axis and y-axis using the xlabel() and ylabel() functions. Box Plot A box plot is used to visualize the distribution of a single continuous variable and to identify outliers. You can create a box plot in Matplotlib using the boxplot() function. Python python# Create a box plot plt.boxplot(data['column']) plt.ylabel('Column') plt.show() In this example, we're creating a box plot of the column variable. We're also adding a label to the y-axis using the ylabel() function. These are just a few examples of what you can do with Matplotlib for data visualization. There are many other functions and techniques you can use, depending on the specific requirements of your project. Conclusion Exploratory data analysis (EDA) is a crucial step in any data science project, and Python provides powerful tools to perform EDA effectively. In this article, we have learned how to use two popular Python libraries, Pandas and Matplotlib, to load, explore, clean, and visualize data. Pandas provides a flexible and efficient way to manipulate and analyze data, while Matplotlib provides a wide range of options to create visualizations. By leveraging these two libraries, we can gain insights from data quickly and easily. With the skills and techniques learned in this article, you can start performing EDA on your own datasets and uncover valuable insights that can drive data-driven decision-making.
As technology professionals, we are already aware that our world is increasingly data-driven. This is especially true in the realm of financial markets, where algorithmic trading has become the norm, leveraging complex algorithms to execute trades at speeds and frequencies that far outstrip human capabilities. In this world where milliseconds can mean the difference between profit and loss, algorithmic trading provides an edge by making trading more systematic and less influenced by human emotional biases. But what if we could take this a step further? What if our trading algorithms could learn from their mistakes, adapt to new market conditions, and continually improve their performance over time? This is where reinforcement learning, a cutting-edge field in artificial intelligence, comes into play. Reinforcement learning (RL) is an area of machine learning that's focused on making decisions. It is about learning from interaction with an environment to achieve a goal, often formulated as a game where the RL agent learns to make moves to maximize its total reward. It is the technology that now being applied to a variety of problems, from self-driving cars to resource allocation in computer networks. But reinforcement learning's potential remains largely untapped in the world of algorithmic trading. This is surprising, given that trading is essentially a sequential decision-making problem, which is exactly what reinforcement learning is designed to handle. In this article, we will delve into how reinforcement learning can enhance algorithmic trading, explore the challenges involved, and discuss the future of this exciting intersection of AI and finance. Whether you're a data scientist interested in applying your skills to financial markets, or a technology enthusiast curious about the practical applications of reinforcement learning, this article has something for you. Understanding Algorithmic Trading Algorithmic trading, also known as algo-trading or black-box trading, utilizes complex formulas and high-speed, computer-programmed instructions to execute large orders in financial markets with minimal human intervention. It is a practice that has revolutionized the finance industry and is becoming increasingly prevalent in today's digital age. At its core, algorithmic trading is about making the trading process more systematic and efficient. It involves the use of sophisticated mathematical models to make lightning-fast decisions about when, how, and what to trade. This ability to execute trades at high speeds and high volumes offers significant advantages, including reduced risk of manual errors, improved order execution speed, and the ability to backtest trading strategies on historical data. In addition, algorithmic trading can implement complex strategies that would be impossible for humans to execute manually. These strategies can range from statistical arbitrage (exploiting statistical patterns in prices) to mean reversion (capitalizing on price deviations from long-term averages). An important aspect of algorithmic trading is that it removes emotional human influences from the trading process. Decisions are made based on pre-set rules and models, eliminating the potential for human biases or emotions to interfere with trading decisions. This can lead to more consistent and predictable trading outcomes. However, as powerful as algorithmic trading is, it is not without its challenges. One of the primary difficulties lies in the development of effective trading algorithms. These algorithms must be robust enough to handle a wide range of market conditions and flexible enough to adapt to changing market dynamics. They also need to be able to manage risk effectively, a task that becomes increasingly challenging as the speed and volume of trading increase. This is where reinforcement learning can play a critical role. With its ability to learn from experience and adapt its strategies over time, reinforcement learning offers a promising solution to the challenges faced by traditional algorithmic trading strategies. In the next section, we will delve deeper into the principles of reinforcement learning and how they can be applied to algorithmic trading. The Basics of Reinforcement Learning Reinforcement Learning (RL) is a subfield of artificial intelligence that focuses on decision-making processes. In contrast to other forms of machine learning, reinforcement learning models learn by interacting with their environment and receiving feedback in the form of rewards or penalties. The fundamental components of a reinforcement learning system are the agent, the environment, states, actions, and rewards. The agent is the decision-maker, the environment is what the agent interacts with, states are the situations the agent finds itself in, actions are what the agent can do, and rewards are the feedback the agent gets after taking an action. One key concept in reinforcement learning is the idea of exploration vs exploitation. The agent needs to balance between exploring the environment to find out new information and exploiting the knowledge it already has to maximize the rewards. This is known as the exploration-exploitation tradeoff. Another important aspect of reinforcement learning is the concept of a policy. A policy is a strategy that the agent follows while deciding on an action from a particular state. The goal of reinforcement learning is to find the optimal policy, which maximizes the expected cumulative reward over time. Reinforcement learning has been successfully applied in various fields, from game playing (like the famous AlphaGo) to robotics (for teaching robots new tasks). Its power lies in its ability to learn from trial and error and improve its performance over time. In the context of algorithmic trading, the financial market can be considered as the environment, the trading algorithm as the agent, the market conditions as the states, the trading decisions (buy, sell, hold) as the actions, and the profit or loss from the trades as the rewards. Applying reinforcement learning to algorithmic trading means developing trading algorithms that can learn and adapt their trading strategies based on feedback from the market, with the aim of maximizing the cumulative profit. However, implementing reinforcement learning in trading comes with its own unique challenges, which we will explore in the following sections. The Intersection of Algorithmic Trading and Reinforcement Learning The intersection of algorithmic trading and reinforcement learning represents an exciting frontier in the field of financial technology. At its core, the idea is to create trading algorithms that can learn from past trades and iteratively improve their trading strategies over time. In a typical reinforcement learning setup for algorithmic trading, the agent (the trading algorithm) interacts with the environment (the financial market) by executing trades (actions) based on the current market conditions (state). The result of these trades, in terms of profit or loss, serves as the reward or penalty, guiding the algorithm to adjust its strategy. One of the key advantages of reinforcement learning in this context is its ability to adapt to changing market conditions. Financial markets are notoriously complex and dynamic, with prices affected by a myriad of factors, from economic indicators to geopolitical events. A trading algorithm that can learn and adapt in real-time has a significant advantage over static algorithms. For example, consider a sudden market downturn. A static trading algorithm might continue executing trades based on its pre-programmed strategy, potentially leading to significant losses. In contrast, a reinforcement learning-based algorithm could recognize the change in market conditions and adapt its strategy accordingly, potentially reducing losses or even taking advantage of the downturn to make profitable trades. Another advantage of reinforcement learning in trading is its ability to handle high-dimensional data and make decisions based on complex, non-linear relationships. This is especially relevant in today's financial markets, where traders have access to vast amounts of data, from price histories to social media sentiment. For instance, a reinforcement learning algorithm could be trained to take into account not just historical price data, but also other factors such as trading volume, volatility, and even news articles or tweets, to make more informed trading decisions. Challenges and Solutions of Implementing Reinforcement Learning in Algorithmic Trading While the potential benefits of using reinforcement learning in algorithmic trading are significant, it's also important to understand the challenges and complexities associated with its implementation. Overcoming the Curse of Dimensionality The curse of dimensionality refers to the exponential increase in computational complexity as the number of features (dimensions) in the dataset grows. For a reinforcement learning model in trading, each dimension could represent a market factor or indicator, and the combination of all these factors constitutes the state space, which can become enormous. One approach to mitigating the curse of dimensionality is through feature selection, which involves identifying and selecting the most relevant features for the task at hand. By reducing the number of features, we can effectively shrink the state space, making the learning problem more tractable. Python from sklearn.feature_selection import SelectKBest, mutual_info_regression # Assume X is the feature matrix, and y is the target variable k = 10 # Number of top features to select selector = SelectKBest(mutual_info_regression, k=k) X_reduced = selector.fit_transform(X, y) Another approach is dimensionality reduction, such as Principal Component Analysis (PCA) or t-distributed Stochastic Neighbor Embedding (t-SNE). These techniques transform the original high-dimensional data into a lower-dimensional space, preserving as much of the important information as possible. Python from sklearn.decomposition import PCA # Assume X is the feature matrix n_components = 5 # Number of principal components to keep pca = PCA(n_components=n_components) X_reduced = pca.fit_transform(X) Handling Uncertainty and Noise Financial markets are inherently noisy and unpredictable, with prices influenced by numerous factors. To address this, we can incorporate techniques that manage uncertainty into our reinforcement learning model. For example, Bayesian methods can be used to represent and manipulate uncertainties in the model. Additionally, reinforcement learning algorithms like Q-learning and SARSA can be used, which learn an action-value function and are known to handle environments with a high degree of uncertainty. Preventing Overfitting Overfitting happens when a model becomes too specialized to the training data and performs poorly on unseen data. Regularization techniques, such as L1 and L2 regularization, can help prevent overfitting by penalizing overly complex models. Python from sklearn.linear_model import Ridge # Assume X_train and y_train are the training data alpha = 0.5 # Regularization strength ridge = Ridge(alpha=alpha) ridge.fit(X_train, y_train) Another way to prevent overfitting is through the use of validation sets and cross-validation. By regularly evaluating the model's performance on a separate validation set during the training process, we can keep track of how well the model is generalizing to unseen data. Python from sklearn.model_selection import cross_val_score from sklearn.linear_model import LinearRegression # Assume X and y are the feature matrix and target variable model = LinearRegression() cv_scores = cross_val_score(model, X, y, cv=5) # 5-fold cross-validation Balancing Exploration and Exploitation Striking the right balance between exploration (trying out new actions) and exploitation (sticking to known actions) is a key challenge in reinforcement learning. Several strategies can be used to manage this tradeoff. One common approach is the epsilon-greedy strategy, where the agent mostly takes the action that it currently thinks is best (exploitation), but with a small probability (epsilon), it takes a random action (exploration). Python import numpy as np def epsilon_greedy(Q, state, n_actions, epsilon): if np.random.random() < epsilon: return np.random.randint(n_actions) # Exploration: choose a random action else: return np.argmax(Q[state]) # Exploitation: choose the action with the highest Q-value Another approach is the Upper Confidence Bound (UCB) method, where the agent chooses actions based on an upper bound of the expected reward, encouraging exploration of actions with high potential. Python import numpy as np import math def ucb_selection(plays, rewards, t): n_arms = len(plays) ucb_values = [0] * n_arms for i in range(n_arms): if plays[i] == 0: ucb_values[i] = float('inf') else: ucb_values[i] = rewards[i] / plays[i] + math.sqrt(2 * math.log(t) / plays[i]) return np.argmax(ucb_values) Future Perspectives The intersection of reinforcement learning and algorithmic trading is a burgeoning field, and while it's already showing promise, there are several exciting developments on the horizon. One of the most prominent trends is the increasing use of deep reinforcement learning, which combines the decision-making capabilities of reinforcement learning with the pattern recognition capabilities of deep learning. Deep reinforcement learning has the potential to handle much more complex decision-making tasks, making it especially suited to the intricacies of financial markets. We can also expect to see more sophisticated reward structures in reinforcement learning models. Current models often use simple reward structures, such as profit or loss from a trade. However, future models could incorporate more nuanced rewards, taking into account factors such as risk, liquidity, and transaction costs. This would allow for the development of more balanced and sustainable trading strategies. Another intriguing prospect is the use of reinforcement learning for portfolio management. Instead of making decisions about individual trades, reinforcement learning could be used to manage a portfolio of assets, deciding what proportion of the portfolio to allocate to each asset in order to maximize returns and manage risk. In terms of research, there's a lot of ongoing work aimed at overcoming the challenges associated with reinforcement learning in trading. For instance, researchers are exploring methods to manage the exploration-exploitation tradeoff more effectively, to deal with the curse of dimensionality, and to prevent overfitting. In conclusion, while reinforcement learning in algorithmic trading is still a relatively new field, it holds immense potential. By continuing to explore and develop this technology, we could revolutionize algo-trading, making it more efficient, adaptable, and profitable. As technology professionals, we have the exciting opportunity to be at the forefront of this revolution.
Vector databases are currently all the rage in the tech world, and it isn't just hype. Vector search has become ever more critical due to artificial intelligence advances which make use of vector embeddings. These vector embeddings are vector representations of word embeddings, sentences, or documents that provide semantic similarity for semantically close inputs by simply looking at a distance metric between the vectors. The canonical example from word2vec in which the embedding of the word "king" was very near the resulting vector from the vectors of the words "queen", "man", and "woman" when arranged in the following formula: king - man + woman ≈ queen The fact that this works has always seemed amazing to me, but it works even for fairly large documents provided our embedding space is of sufficiently high dimension. With modern deep learning methods, you can get excellent embeddings of complex documents. For TerminusDB we needed a way to leverage these sorts of embeddings for the following tasks that our users have asked for: Full-text search Entity resolution (finding other documents which are likely the same for deduplication) Similarity search (for related content or for recommender systems) Clustering We decided to prototype using OpenAI's embeddings, but in order to get the rest of the features we required a vector database. We needed a few unusual features, including the ability to do incremental indexing, and the ability to index the basis of commits, so we know precisely what commit an index applies to. This allows us to put indexing into our CI workflows. A versioned open-source vector database doesn't exist in the wild. So we wrote one! Writing a Vector Database A vector database is a store of vectors with the ability to compare any two vectors using some metric. The metric can be a lot of different things such as Euclidean distance, Cosine similarity, Taxicab geometry, or really anything that obeys the triangle inequality rules required to define a metric space. In order to make this fast, you need to have some sort of indexing structure to quickly find candidates that are already close for comparison. Otherwise, many operations will need to compare with every single thing in the database, every time. There are many approaches to indexing vector spaces, but we went with the HNSW (Hierarchical Navigable Small World) graph (see Malkov and Yashunin). HNSW is easy to understand and provides good performance in both low and high dimensions, so is flexible. Most importantly there is a very clear open-source implementation that we found - HNSW for Rust Computer Vision. Storing the Vectors Vectors are stored in a domain. This helps to separate different vector stores that do not need to describe the same vectors. For TerminusDB we have many different commits that all pertain to the same vectors, so it's important that we put them all into the same domain. Markdown Page 0 1 2... ——————————————————————— Vectors: | 0 [......] 2 [......] | 1 [......] 3 [......] The vector store is page-based, where each buffer is designed to map cleanly to the operating system pages but fit the vectors we use closely. We assign each vector an index and then we can map from the index to the appropriate page and offset. Inside the HNSW index, we refer to a LoadedVec. This ensures that the page lives in a buffer, currently loaded so we can perform our metric comparisons on the vectors of interest. As soon as the last LoadedVec drops from a buffer, the buffer can be added back into a buffer pool to be used to load a new page. Creating a Versioned Index We build an HNSW structure for each (domain + commit) pair. If starting a new index, we start with an empty HNSW. If starting an incremental index from a previous commit, we load the old HNSW from the previous commit and then begin our indexing operations. What is new versus what is old is all kept in TerminusDB, which knows how to find changes between commits and can submit them to the vector database indexer. The indexer only needs to know the operations it is being asked to perform (i.e., Insert, Delete, Replace). We maintain the indexes themselves in an LRU pool that allows us to load on demand or use a cache if the index is already in memory. Since we only perform destructive operations at commits, this caching is always coherent. When we save the index, we serialize the structure with the raw vector index as a stand-in for the LoadedVec which helps to keep the index small. In the future, we would like to use some of the tricks we have learned in TerminusDB to keep layers of an index around, so new layers can be added without requiring each incremental index to add a duplicate when serializing. However, the indexes have proved small enough compared to the vectors we are storing that it has not mattered much. NOTE: While we currently do incremental indexing, we have yet to implement the delete and replace operations (there are only so many hours in a week!). I've read the literature on HNSW and it seems that it is not yet well described. We have a design for the delete and replace operations that we think will work well with HNSW and wanted to explain in case any technical people have ideas: If we are in an upper layer of the HNSW, then simply ignore the deletion - it should not matter much as most vectors are not in upper layers, and the ones that are, are only for navigation. If we are in the zero layer but not in an above layer, delete the node from the index, while trying to replace links between all neighbors of the deleted link according to closeness. If we are in the zero layer but also above, mark the node as deleted, and use it for navigation but do not store this node in the candidate pool. Finding Embeddings We use OpenAI to define our embeddings, and after an indexing request is made to TerminusDB, we feed each of the documents to OpenAI which returns lists of float vectors in JSON. It turns out that the embeddings are quite sensitive to context. We tried initially just submitting TerminusDB JSON documents and the results were not fantastic. However, we found that if we define a GraphQL query + Handlebars template, we can create very high-quality embeddings. For People in Star Wars, this pair, which is defined in our schema, looks like this: JSON { "embedding": { "query": "query($id: ID){ People(id : $id) { birth_year, created, desc, edited, eye_color, gender, hair_colors, height, homeworld { label }, label, mass, skin_colors, species { label }, url } }", "template": "The person's name is {{label}.{{#if desc} They are described with the following synopsis: {{#each desc} *{{this} {{/each}.{{/if}{{#if gender} Their gender is {{gender}.{{/if}{{#if hair_colors} They have the following hair colours: {{hair_colors}.{{/if}{{#if mass} They have a mass of {{mass}.{{/if}{{#if skin_colors} Their skin colours are {{skin_colors}.{{/if}{{#if species} Their species is {{species.label}.{{/if}{{#if homeworld} Their homeworld is {{homeworld.label}.{{/if}" } } The meaning of each field in the People object is rendered as text which helps OpenAI understand what we mean, providing much better semantics. Ultimately it would be nice if we could guess these sentences from a combination of our schema documentation and the schema structure, which is probably also possible using AI chat! But for now, this works brilliantly and does not require much technical sophistication. Indexing Star Wars So what happens when we actually run this thing? Well, we tried it out on our Star Wars data product to see what would happen. First, we fire off an index request, and our indexer obtains the information from TerminusDB: curl 'localhost:8080/index?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' This returns with a task-id which we can use to poll an endpoint for completion. The index file and vector files for the domain admin/star_wars and the commit o2uq7k1mrun1vp4urktmw55962vlpto come out as: admin%2Fstar_wars@o2uq7k1mrun1vp4urktmw55962vlpto.hnswand admin%2Fstar_wars.vecs. We can now ask the semantic index server about our documents at the specified commit. curl 'localhost:8080/search?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' -d "Who are the squid people" We get back a number of results as JSON which look like this: JSON [{"id":"terminusdb:///star-wars/Species/8","distance":0.09396297}, ...] But what is the embedding string we used to produce this result? This is how the text rendered for the Species/8 id: JSON "The species name is Mon Calamari. They have the following hair colours: none. Their skin colours are red, blue, brown, magenta. They speak the Mon Calamarian language." Amazing! Notice that it never says squid anywhere! There is some pretty amazing work being done by our embeddings here. Let's have another go: curl 'localhost:8080/search?commit=o2uq7k1mrun1vp4urktmw55962vlpto&domain=admin/star_wars' -d "Wise old man" JSON "The person's name is Yoda. They are described with the following synopsis: Yoda is a fictional character in the Star Wars franchise created by George Lucas, first appearing in the 1980 film The Empire Strikes Back. In the original films, he trains Luke Skywalker to fight against the Galactic Empire. In the prequel films, he serves as the Grand Master of the Jedi Order and as a high-ranking general of Clone Troopers in the Clone Wars. Following his death in Return of the Jedi at the age of 900, Yoda was the oldest living character in the Star Wars franchise in canon, until the introduction of Maz Kanata in Star Wars: The Force Awakens. Their gender is male. They have the following hair colours: white. They have a mass of 17. Their skin colours are green." Incredible! While we do say "oldest" in the text, we don't say "wise" or "man"! I hope you can see how this could be helpful for you in getting high-quality semantic indexing of your data! Conclusion We have also added endpoints to find neighboring documents and to find duplicates that search the entire corpus. The latter was used on some benchmarks and has performed admirably. We hope to show the results of these experiments here soon. While there are really great vector databases out there in the wild, such as Pinecone, we want to have a sidecar that integrates well with TerminusDB and which can be used for less technical users who care about content primarily and are not going to be spinning up their own vector database. We are really excited about the potential of this VectorLink, and would love people to have a look at what we have so far! Please forgive us a bit for the relatively sparse error handling. We're working furiously on it!
We want to save our thumbnail data to a database so that we can render our pictures to a nice HTML gallery page and finish the proof of concept for our Google Photos clone! Which database should we use and why? Which Java database API? What database tools will make our lives easier along the way? Find out in this episode of Marco Codes! What’s in the Video 00:00 Intro We'll cover what the plan is for this episode: to add database capabilities to our Google Photos clone, which currently only works with files, but doesn't store their metadata in a database table. 00:52 Where We Left Off Before jumping straight into implementing database and ORM features, we will do a quick code recap of the previous episodes, to remind ourselves how the image scanning and conversion process currently works. 01:46 Setup Whenever we want to do something with databases and Java, we need a couple of (in this case) Maven dependencies. More specifically we want to make sure to add the H2 database to our project, which we will use for production, not just for testing! We'll also add the HikariCP connection pool to it - something I do by default in every project and which is usually done automatically by frameworks like Spring Boot. 04:38 Writing a Database Schema Here, I present my current approach when doing Java database work: making sure the database schema is hand-written, thinking through table names, column names, types, etc. Hence, we'll start writing a schema.sql file for our new "media" table during this section. 10:08 Creating a DataSource Having created the schema, we'll need to create a DataSource next. As we're using HikariCP, we'll follow its documentation pages to set up the DataSource. We'll also make sure the schema.sql file written earlier gets automatically executed whenever we run our application. 12:46 Saving Thumbnail Data It's finally time to not just render thumbnail files on disk, but also save information about the generated thumbnails and original images in our brand-new database table! We'll use plain JDBC to do that and talk about its advantages and disadvantages. 14:00 Refactoring Maneuver Sometimes you just need to _see_ certain things that are very hard to explain in words. To clean up our program, we will have to change a couple of method signatures and move parameters up and down throughout the file. 16:21 Extracting Image Creation Dates At the moment, we don't properly detect the image creation date from its metadata. We'll talk about how to implement this in the future and why we'll stick with the file creation date for now. 17:10 Avoiding Duplication We'll also need to handle duplicates. If we re-run our program several times, we don't want to store the image metadata multiple times in our tables. Let's fix this here. 19:04 Inspecting H2 File DBs In case you don't know how to access H2 file databases, we will spend some time showing you how to do that from inside IntelliJ IDEA and its database tool window. 21:23 Rendering HTML Output Last but not least, we'll need to render all the information from our database to a nice, little HTML page, so we can actually browse our thumbnails! As a bonus point, this will be the simplest and probably dirtiest implementation of such an HTML page you've seen for a while - but it works! 30:30 What’s Next? Did you like what you saw? Which feature should we implement next? Let me know! Video
This post explains how to launch an Amazon EMR cluster and deploy a Kedro project to run a Spark job. Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform for applications built using open-source big data frameworks, such as Apache Spark, that process and analyze vast amounts of data with AWS. 1. Set up the Amazon EMR Cluster One way to install Python libraries onto Amazon EMR is to package a virtual environment and deploy it. To do this, the cluster needs to have the same Amazon Linux 2 environment as used by Amazon EMR. We used this example Dockerfile to package our dependencies on an Amazon Linux 2 base. Our example Dockerfile is as below: Shell FROM --platform=linux/amd64 amazonlinux:2 AS base RUN yum install -y python3 ENV VIRTUAL_ENV=/opt/venv RUN python3 -m venv $VIRTUAL_ENV ENV PATH="$VIRTUAL_ENV/bin:$PATH" COPY requirements.txt /tmp/requirements.txt RUN python3 -m pip install --upgrade pip && \ python3 -m pip install venv-pack==0.2.0 && \ python3 -m pip install -r /tmp/requirements.txt RUN mkdir /output && venv-pack -o /output/pyspark_deps.tar.gz FROM scratch AS export COPY --from=base /output/pyspark_deps.tar.gz / Note: A DOCKER_BUILDKIT backend is necessary to run this Dockerfile (make sure you have it installed). Run the Dockerfile using the following command:DOCKER_BUILDKIT=1 docker build --output . <output-path> This will generate a pyspark_deps.tar.gz file at the <output-path> specified in the command above. Use this command if your Dockerfile has a different name:DOCKER_BUILDKIT=1 docker build -f Dockerfile-emr-venv --output . <output-path> 2. Set up CONF_ROOT The kedro package command only packs the source code and yet the conf directory is essential for running any Kedro project. To make it available to Kedro separately, its location can be controlled by setting CONF_ROOT. By default, Kedro looks at the root conf folder for all its configurations (catalog, parameters, globals, credentials, logging) to run the pipelines, but this can be customised by changing CONF_ROOT in settings.py.For Kedro versions < 0.18.5 For Kedro versions >= 0.18.5 Change CONF_ROOT in settings.py to the location where the conf directory will be deployed. It could be anything. e.g. ./conf or /mnt1/kedro/conf. For Kedro versions >= 0.18.5 Use the --conf-source CLI parameter directly with kedro run to specify the path. CONF_ROOT need not be changed in settings.py. 3. Package the Kedro Project Package the project using the kedro package command from the root of your project folder. This will create a .whl in the dist folder that will be used when doing spark-submit to the Amazon EMR cluster to specify the --py-files to refer to the source code. 4. Create .tar for conf As described, the kedro package command only packs the source code and yet the conf directory is essential for running any Kedro project. Therefore it needs to be deployed separately as a tar.gz file. It is important to note that the contents inside the folder needs to be zipped and not the conf folder entirely. Use the following command to zip the contents of the conf directory and generate a conf.tar.gz file containing catalog.yml, parameters.yml and other files needed to run the Kedro pipeline. It will be used with spark-submit for the --archives option to unpack the contents into a conf directory.tar -czvf conf.tar.gz --exclude="local" conf/* 5. Create an Entrypoint for the Spark Application Create an entrypoint.py file that the Spark application will use to start the job. This file can be modified to take arguments and can be run only using main(sys.argv) after removing the params array.python entrypoint.py --pipeline my_new_pipeline --params run_date:2023-02-05,runtime:cloud This would mimic the exact kedro run behaviour. Python import sys from proj_name.__main__ import main: if __name__ == "__main__": """ These params could be used as *args to test pipelines locally. The example below will run `my_new_pipeline` using `ThreadRunner` applying a set of params params = [ "--pipeline", "my_new_pipeline", "--runner", "ThreadRunner", "--params", "run_date:2023-02-05,runtime:cloud", ] main(params) """ main(sys.argv) 6. Upload Relevant Files to S3 Upload the relevant files to an S3 bucket (Amazon EMR should have access to this bucket), in order to run the Spark Job. The following artifacts should be uploaded to S3: .whl file created in step #3 Virtual Environment tar.gz created in step 1 (e.g. pyspark_deps.tar.gz) .tar file for conf folder created in step #4 (e.g. conf.tar.gz) entrypoint.py file created in step #5. 7.spark-submit to the Amazon EMR Cluster Use the following spark-submit command as a step on Amazon EMR running in cluster mode. A few points to note: pyspark_deps.tar.gz is unpacked into a folder named environment Environment variables are set referring to libraries unpacked in the environment directory above. e.g. PYSPARK_PYTHON=environment/bin/python conf directory is unpacked to a folder specified in the following after the # symbol ( s3://{S3_BUCKET}/conf.tar.gz#conf) Note the following: Kedro versions < 0.18.5. The folder location/name after the # symbol should match with CONF_ROOT in settings.py Kedro versions >= 0.18.5. You could follow the same approach as earlier. However, Kedro now provides flexibility to provide the CONF_ROOT through the CLI parameters using --conf-source instead of setting CONF_ROOT in settings.py. Therefore --conf-root configuration could be directly specified in the CLI parameters and step 2 can be skipped completely. Shell spark-submit --deploy-mode cluster --master yarn --conf spark.submit.pyFiles=s3://{S3_BUCKET}/<whl-file>.whl --archives=s3://{S3_BUCKET}/pyspark_deps.tar.gz#environment,s3://{S3_BUCKET}/conf.tar.gz#conf --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON=environment/bin/python --conf spark.yarn.appMasterEnv.<env-var-here>={ENV} --conf spark.executorEnv.<env-var-here>={ENV} s3://{S3_BUCKET}/run.py --env base --pipeline my_new_pipeline --params run_date:2023-03-07,runtime:cloud Summary This post describes the sequence of steps needed to deploy a Kedro project to an Amazon EMR cluster. Set up the Amazon EMR cluster Set up CONF_ROOT (optional for Kedro versions >= 0.18.5) Package the Kedro project Create an entrypoint for the Spark application Upload relevant files to S3 spark-submit to the Amazon EMR cluster Kedro supports a range of deployment targets, including Amazon SageMaker, Databricks, Vertex AI and Azure ML, and our documentation additionally includes a range of approaches for single-machine deployment to a production server.
Time series analysis is a specialized branch of statistics that involves the study of ordered, often temporal data. Its applications span a multitude of fields, including finance, economics, ecology, neuroscience, and physics. Given the temporal dependency of the data, traditional validation techniques such as K-fold cross-validation cannot be applied, thereby necessitating unique methodologies for model training and validation. This comprehensive guide walks you through the crucial aspects of time series analysis, covering topics such as cross-validation techniques, time series decomposition and transformation, feature engineering, derivative utilization, and a broad range of time series modeling techniques. Whether you are a novice just starting out or an experienced data scientist looking to hone your skills, this guide offers valuable insights into the complex yet intriguing world of time series analysis. Cross-Validation Techniques for Time Series Executing cross-validation on time series data necessitates adherence to the chronological arrangement of observations. Therefore, traditional methods like K-Fold cross-validation are unsuitable as they jumble the data randomly, disrupting the sequence of time. Two principal strategies exist for cross-validation on time series data: "Rolling Forecast Origin" and "Expanding Window." Rolling Forecast Origin Approach This approach involves a sliding data window for training, followed by a test set right after the training window. With each cycle, the window advances in time, a process also known as "walk-forward" validation. In the following example, the data is segregated into five parts. During each cycle, the model is first trained on the initial part, then on the first and second parts, and so on, testing on the subsequent part each time. Python from sklearn.model_selection import TimeSeriesSplit import numpy as np # Assuming 'X' and 'y' are your features and target tscv = TimeSeriesSplit(n_splits=5) for train_index, test_index in tscv.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] # Fit and evaluate your model here Expanding Window Technique The expanding window technique resembles the rolling forecast origin method, but instead of using a fixed-size sliding window, the training set grows over time to incorporate more data. In this example, we set max_train_size=None to ensure the training set size expands in each iteration. The model is first trained on the first part, then the first and second parts, and so on, testing on the subsequent part each time. Python from sklearn.model_selection import TimeSeriesSplit import numpy as np # Assuming 'X' and 'y' are your features and target tscv = TimeSeriesSplit(n_splits=5, max_train_size=None) for train_index, test_index in tscv.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] # Fit and evaluate your model here Decomposing and Transforming Time Series The process of breaking down time series into its constituent parts can significantly aid analysis. This process falls into two categories: decomposing and transforming. Decomposing of Time Series Time series dissection is a technique used to break down a time series into its elemental components to facilitate understanding and analysis of underlying patterns and trends. The primary components of a time series are: Trend: This represents the long-term movement or overall direction of the time series data. It signifies the basic pattern or the general course the data is following. Seasonality: These are repeating patterns or fluctuations in the time series data that occur at regular intervals, such as daily, weekly, monthly, or yearly. Seasonality is driven by external factors like weather, holidays, or other seasonal events. Irregular Component: Also termed as noise or residual, this is the unexplained variation in the time series data after removing the trend, seasonality, and cyclical components. It represents random, unpredictable fluctuations that cannot be attributed to any systematic pattern. In Python, Decomposition can be performed using thestatsmodels or statsforecast libraries: Python from statsforecast import StatsForecast from statsforecast.models import MSTL, AutoARIMA models = [ MSTL( season_length=[12 * 7], # seasonalities of the time series trend_forecaster=AutoARIMA(), # model used to forecast trend ) ] sf = StatsForecast( models=models, # model used to fit each time series freq="D", # frequency of the data ) sf = sf.fit(data) test = sf.fitted_[0, 0].model_ fig, ax = plt.subplots(1, 1, figsize=(10, 8)) test.plot(ax=ax, subplots=True, grid=True) plt.tight_layout() plt.show() Different models manage these elements in diverse ways. Both linear models and Prophet inherently deal with all these factors. However, other models like Boosting Trees and Neural Networks necessitate clear data transformation and feature creation. A common strategy post-decomposition is to separately model each constituent. After deriving independent predictions for each component, these can be merged to generate the ultimate forecast. This method enables a more detailed examination of time series data and frequently results in superior predictive outcomes. Transforming Time Series Data Frequently, transformations are utilized on time series data to stabilize variability, diminish outlier effects, enhance forecasting accuracy, and facilitate modeling. Some prevalent transformation types are: Differencing This transformation is a process where the preceding value (t-1) is deducted from the current value (t). Its primary function is to eliminate trend and seasonality, thus rendering the data stationary—a prerequisite for specific time series models such as ARIMA. The diff() function in pandas can be employed to perform differencing in Python: Python import pandas as pd df['diff'] = df['col'].diff() Log Transformation Log transformation is beneficial when working with data that displays exponential growth or possesses multiplicative seasonality. This transformation aids in stabilizing variance and mitigating heteroscedasticity. The numpy library in Python can be used to execute a log transformation. Python import numpy as np df['log_col'] = np.log(df['col']) Square Root Transformation The square root transformation aids in stabilizing variance and reducing the influence of extreme values or outliers. This transformation can be accomplished using numpy. Python df['sqrt_col'] = np.sqrt(df['col']) Box-Cox Transformation This transformation encompasses both log and square root transformations as particular cases and assists in stabilizing the variance and making the data more Gaussian. The Box-Cox transformation can be denoted as the following: Python's SciPy library offers a function for the Box-Cox transformation. Python from scipy import stats df['boxcox_col'], lambda_param = stats.boxcox(df['col']) It's crucial to remember post-transformation that any predictions made using the transformed data must be reverted to the original scale before interpretation. For instance, if a log transformation was employed, the predicted values should be raised to the power of e. The selection of transformation is contingent on the specific problem, the data characteristics, and the assumptions of the ensuing modeling process. Each transformation possesses its advantages and disadvantages, and no single solution fits all scenarios. Therefore, it's advisable to experiment with different transformations and select the one that yields the best model performance. Time Series Feature Engineering The concept of feature engineering in the context of time series pertains to the formulation of significant predictors or features from raw temporal data to enhance the efficacy of machine learning algorithms. As opposed to problems involving tabular data, time series provides an extra dimension for maneuvering, namely, time. Here are some prominent techniques of feature engineering for time series: Lag Features: These encapsulate prior patterns or dependencies and are highly beneficial in solutions devised by shifting the time series data by a designated number of steps (delays). Rolling and Expanding Window Statistics: These techniques calculate summary metrics (like average, median, and standard deviation) over a shifting or growing window of time, helping to spot local trends and patterns. Seasonal and Trend Features: These assist in portraying the behavior of individual components within the model. Date/Time-Based Features: Specifics such as the time of day, day of the week, month, or fiscal quarter can aid in identifying repeated patterns within the data. Fourier Transforms: These transformations can help in discovering recurrent patterns in the data. Employing Derivatives in Time Series Analysis In time series analysis, derivatives denote the speed at which a time series variable alters over time. They record the changes in a variable's value over a period and can be utilized as attributes in time series evaluation or machine learning models, as they convey insights about underlying trends, cyclical changes, and additional patterns in the data. Here's a look at how derivatives can serve as attributes in time series evaluation: Primary Derivative: This illustrates the immediate rate of change of a variable concerning time. It aids in discerning the direction and size of the trend in the data. Secondary Derivative: This signifies the speed of change of the primary derivative. It assists in identifying speed-up or slow-down in the trend and pinpointing inflection points. Seasonal Derivatives: These derivatives are useful in identifying cyclical fluctuations in the data. Cross-Derivative: This is beneficial in discerning the impact of paired variables. Time Series Modeling Establishing a performance benchmark with baseline models is an essential process in time series forecasting, enabling the evaluation of more advanced models. Baseline models such as Exponential Smoothing, ARIMA, and Prophet remain highly effective. Exponential Smoothing In this instance, we'll employ the Holt-Winters method of exponential smoothing, which is designed to manage trends and seasonality. Thestatsmodels library is where this method can be found. Python from statsmodels.tsa.api import ExponentialSmoothing # Assume 'df' is a pandas DataFrame and 'col' is the time series to be forecasted y = df['col'] # Fit the model model = ExponentialSmoothing(y, seasonal_periods=12, trend='add', seasonal='add') model_fit = model.fit() # Forecast the next 5 periods forecast = model_fit.forecast(5) ARIMA The AutoRegressive Integrated Moving Average (ARIMA) is a widely-used technique for predicting univariate time series data. You can find an implementation of ARIMA in the statsmodels library. Python from statsmodels.tsa.arima.model import ARIMA # Fit the model model = ARIMA(y, order=(1,1,1)) # (p, d, q) parameters model_fit = model.fit() # Forecast the next 5 periods forecast = model_fit.forecast(steps=5) The parameters (p, d, q) correspond to the autoregressive, integrated, and moving average portions of the model, respectively. Prophet Prophet is a forecasting approach for time series data built around an additive model that accommodates non-linear trends with daily, weekly, and yearly seasonality and holiday effects. Facebook is behind its development. Python from fbprophet import Prophet # The input to Prophet is always a DataFrame with two columns: ds and y. df = df.rename(columns={'date': 'ds', 'col': 'y'}) # Fit the model model = Prophet() model.fit(df) # Create DataFrame for future predictions future = model.make_future_dataframe(periods=5) # Forecast the next 5 periods forecast = model.predict(future) In these examples, model parameters like the ARIMA model's order or the type of trend and seasonality in the Exponential Smoothing model are chosen somewhat arbitrarily. In a real-world scenario, you'd apply model diagnostics, cross-validation, and potentially grid search to determine the optimal parameters for your specific dataset. Moreover, new libraries such as Nixlab offer a variety of built-in models that provide alternative viewpoints. Nevertheless, as baseline models, these methods serve as an excellent starting point for time series forecasting and accommodate a broad spectrum of time series characteristics. Once you have some baseline scores, you can then explore advanced models such as boosting trees, Neural Networks (NNs), N-BEATS, and Temporal Fusion Transformers (TFT). Gradient Boosting In the domain of time series analysis, gradient boosting stands out due to its ability to make no presumptions about the data distribution or the interconnections among variables. This attribute renders it highly adaptable and proficient at handling intricate, non-linear associations typically found in time series data. Though employing gradient boosting for time series prediction isn't as intuitive as conventional models like ARIMA, its efficacy remains high. The trick lies in structuring the prediction issue as a supervised learning problem, generating lagged features that encapsulate the temporal interdependencies in the data. Here's an illustration of utilizing the XGBoost library, a highly optimized variant of gradient boosting, for time series prediction: Python import xgboost as xgb from sklearn.metrics import mean_squared_error # Assume 'X_train', 'y_train', 'X_test', and 'y_test' are your training and test datasets # Convert datasets into DMatrix (optimized data structure for XGBoost) dtrain = xgb.DMatrix(X_train, label=y_train) dtest = xgb.DMatrix(X_test) # Define the model parameters params = { 'max_depth': 3, 'eta': 0.01, 'objective': 'reg:squarederror', 'eval_metric': 'rmse' } # Train the model model = xgb.train(params, dtrain, num_boost_round=1000) # Make predictions y_pred = model.predict(dtest) # Evaluate the model mse = mean_squared_error(y_test, y_pred) print(f'Test RMSE: {np.sqrt(mse)}') Recurrent Neural Networks (RNNs) For RNNs, we will employ the LSTM (Long Short-Term Memory) version, renowned for capturing long-lasting dependencies and having lower susceptibility to the vanishing gradient issue. Python import torch from torch import nn # Assume 'X_train' and 'y_train' are your training dataset # 'n_features' is the number of input features # 'n_steps' is the number of time steps in each sample X_train = torch.tensor(X_train, dtype=torch.float32) y_train = torch.tensor(y_train, dtype=torch.float32) class RNN(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(RNN, self).__init__() self.hidden_size = hidden_size self.rnn = nn.RNN(input_size, hidden_size, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): x, _ = self.rnn(x) x = self.fc(x[:, -1, :]) # we only want the last time step return x # Define the model model = RNN(n_features, 50, 1) # Define loss and optimizer criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.01) # Training loop for epoch in range(200): model.train() output = model(X_train) loss = criterion(output, y_train) optimizer.zero_grad() loss.backward() optimizer.step() # Make a prediction model.eval() x_input = ... # some new input data x_input = torch.tensor(x_input, dtype=torch.float32) x_input = x_input.view((1, n_steps, n_features)) yhat = model(x_input) N-BEATS N-BEATS, a cutting-edge model for univariate time series prediction, is incorporated in the PyTorch-based pytorch-forecasting package. Python from pytorch_forecasting import TimeSeriesDataSet, NBeats # Assume 'data' is a pandas DataFrame containing your time series data # 'group_ids' is the column(s) that identifies each time series # 'target' is the column you want to forecast # Create the dataset max_encoder_length = 36 max_prediction_length = 6 training_cutoff = data["time_idx"].max() - max_prediction_length context_length = max_encoder_length prediction_length = max_prediction_length training = TimeSeriesDataSet( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="target", group_ids=["group_ids"], max_encoder_length=context_length, max_prediction_length=prediction_length, ) # Create the dataloaders train_dataloader = training.to_dataloader(train=True, batch_size=128, num_workers=0) # Define the model pl.seed_everything(42) trainer = pl.Trainer(gpus=0, gradient_clip_val=0.1) net = NBeats.from_dataset(training, learning_rate=3e-2, weight_decay=1e-2, widths=[32, 512], backcast_loss_ratio=1.0) # Fit the model trainer.fit( net, train_dataloader=train_dataloader, ) # Predict raw_predictions, x = net.predict(val_dataloader, return_x=True) Temporal Fusion Transformers (TFT) TFT, another robust model adept at handling multivariate time series and meta-data, is also included in the pytorch-forecasting package. Boosting trees primarily undertake most tasks due to their scalability and robust performance. Nevertheless, given sufficient time and resources, Recurrent Neural Networks (RNNs), N-BEATS, and TFT have demonstrated impressive performance in time series tasks, in spite of their significant computational demands and particular scalability. Python # Import libraries import torch import pandas as pd from pytorch_forecasting import TimeSeriesDataSet, TemporalFusionTransformer, Baseline, Trainer from pytorch_forecasting.metrics import SMAPE from pytorch_forecasting.data.examples import get_stallion_data # Load example data data = get_stallion_data() data["time_idx"] = data["date"].dt.year * 12 + data["date"].dt.month data["time_idx"] -= data["time_idx"].min() # Define dataset max_prediction_length = 6 max_encoder_length = 24 training_cutoff = data["time_idx"].max() - max_prediction_length context_length = max_encoder_length prediction_length = max_prediction_length training = TimeSeriesDataSet( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="volume", group_ids=["agency"], min_encoder_length=context_length, max_encoder_length=context_length, min_prediction_length=prediction_length, max_prediction_length=prediction_length, static_categoricals=["agency"], static_reals=["avg_population_2017"], time_varying_known_categoricals=["month"], time_varying_known_reals=["time_idx", "minimum", "mean", "maximum"], time_varying_unknown_categoricals=[], time_varying_unknown_reals=["volume"], ) validation = TimeSeriesDataSet.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) # Create dataloaders batch_size = 16 train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0) val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0) # Define model and trainer pl.seed_everything(42) trainer = Trainer( gpus=0, # clipping gradients is a hyperparameter and important to prevent divergance # of the gradient for recurrent neural networks gradient_clip_val=0.1, ) tft = TemporalFusionTransformer.from_dataset( training, learning_rate=0.03, hidden_size=16, lstm_layers=1, dropout=0.1, hidden_continuous_size=8, output_size=7, # 7 quantiles by default loss=SMAPE(), log_interval=10, reduce_on_plateau_patience=4, ) # Fit the model trainer.fit( tft, train_dataloader=train_dataloader, val_dataloaders=val_dataloader, ) # Evaluate the model raw_predictions, x = tft.predict(val_dataloader, mode="raw", return_x=True) Conclusion In summary, multiple strategies can enrich time series analysis. It begins with the transformation of data and tasks to glean more insights about the nature of the data, followed by integrating time into your features and outcomes. Modeling can then commence with standard baselines, proceed with, and possibly culminate with boosting without hastily implementing neural networks and transformers. Finally, the joint application of Dickey-Fuller and Kwiatkowski-Phillips-Schmidt-Shin tests can bolster the reliability of the outcomes. Python import numpy as np import pandas as pd from statsmodels.tsa.stattools import adfuller, kpss # Assume 'data' is your time series data # Dickey-Fuller test result = adfuller(data) print('ADF Statistic: %f' % result[0]) print('p-value: %f' % result[1]) print('Critical Values:') for key, value in result[4].items(): print('\t%s: %.3f' % (key, value)) # KPSS test result = kpss(data, regression='c') print('\nKPSS Statistic: %f' % result[0]) print('p-value: %f' % result[1]) print('Critical Values:') for key, value in result[3].items(): print('\t%s: %.3f' % (key, value))
Today, in our modern developer world, it is absolutely impossible to imagine life without such technologies as React, Node JS, GraphQL, and so on. They have solid ranks and are holding leading positions in data delivery. 70% of the cases I come across are projects that are integrated with GraphQL or are about to migrate to it. More and more companies prefer to use the GraphQL data query syntax, and today it is a piece of must-have knowledge. GraphQL is a query-typed language for API which is widely used for requesting data from the server side to the client side in optimized mater. Clients request exactly what they need using typed schema. It allows you to send only what was requested instead of a fixed dataset. Apollo Server gives you tools for sending responses to client requests. Apollo Client gives the ability to use GraphQL API, including cache and linking. What Is It about? We gonna create two Apollo Servers, which going to handle the GraphQL schema merge. It’s a situation when some external server responds to GraphQL API and some other service uses its own GraphQL schema, including external schema. On the Node layer, we going to wrap results up from the external server in one schema and send it to the client. Literally, we gonna just merge two schemas into one and send it to the client. Let’s Dive Into the Code For the implementation, we going to use NodeJS environment, Koa middleware, and Apollo Server with GraphQL Tools. We have to run two servers. Both have to have a GraphQL Apollo Server. Here is the diagram. Time to create boilerplates and run them both. For that, we need to create two folders and name one folder something like this: boilerplate-raphql-koa-server-external and the second folder just like this: boilerplate-graphql-koa-server Before starting, please take a look at the folder structure in both projects. Pretty straightforward. The difference between those two repos is going to be in the code. Plain Text ├── package.json └── src ├── index.js ├── resolvers.js └── schema.js External GraphQL Server Now, let’s set up the boilerplate-graphql-koa-server-external JSON { "name": "boilerplate-graphql-koa-server-external", "version": "1.0.0", "description": "Boilerplate GraphQL Koa server external", "main": "src/index.js", "scripts": { "start": "PORT=4000 node src/index.js" }, "engines": { "node": "16.17.x" }, "dependencies": { "@graphql-tools/schema": "^9.0.2", "@koa/cors": "^3.4.1", "apollo-server-core": "^3.10.2", "apollo-server-koa": "^3.10.2", "graphql": "^15.8.0", "koa": "^2.13.4", "koa-graphql": "^0.12.0" } } Then let’s create the server itself. In the src folder in theindex.js add server setup: JavaScript const Koa = require('koa'); const http = require('http'); const cors = require('@koa/cors'); const { ApolloServer } = require('apollo-server-koa'); const { makeExecutableSchema } = require('@graphql-tools/schema'); const typeDefs = require('./schema'); const resolvers = require('./resolvers'); async function server({ typeDefs, resolvers }) { const app = new Koa(); const httpServer = http.createServer(); const apolloServer = new ApolloServer({ introspection: true, schema: makeExecutableSchema({ typeDefs, resolvers, }), }); await apolloServer.start(); apolloServer.applyMiddleware({ app, path: '/api/v1/graphql' }); httpServer.on('request', app.callback()); await new Promise(resolve => httpServer.listen({ port: process.env.PORT }, resolve)); console.log( `External Server ready at http://localhost:${process.env.PORT}${apolloServer.graphqlPath}` ); return { apolloServer, app }; } server({ typeDefs, resolvers }).then(({ app }) => { app.use(cors()); }); The async function serverwill take care of the Koa app itself, and we are going to create the Apollo server with an executable schema where we have to provide types from schema and resolvers. From the official docs, we must call apopServer.start() in advance before apolloServer.applyMiddleware . It allows for identifying potential issues and taking action in the case of crushing the process in Apollo Server startup instead to start serving requests. The second part is the boilerplate-graphql-koa-server-externallet's set up schema and resolvers. JavaScript const { gql } = require('apollo-server-koa'); module.exports = gql` type Query { getItemsExternal: [DataExternalExample] } type DataExternalExample { id: ID label: String } type Mutation { updateDataExternal(label: String!): DataExternalExample! } `; Resolvers for the schema. JavaScript const fakeData = { id: 223421, label: 'Some Label From External', }; module.exports = { Query: { getItemsExternal: () => [fakeData], }, Mutation: { updateDataExternal: (_, { label }) => { return { ...fakeData, label, }; }, }, }; Now it’s time to check the server responses. Before that, don’t forget to install the following packages: npm i and then run the command npm run start and put in the Chrome browser the URL: http://localhost:4000/api/v1/graphql. Click on the button “Query your server,” and you can get the interface of Apollo GraphQL. It allows you to see the requested schema from the server. Open the Introspection Schema page. You will see there our schema: If you were able to introspect the schema, then that means we are done with our boilerplate-graphql-koa-server-external GraphQL Server for Merging Schemas Let’s move now to boilerplate-graphql-koa-server setups. Almost everything is the same in package.json from external but with additional packages and different PORT , name, and description. JSON { "name": "boilerplate-graphql-koa-server", "description": "Boilerplate GraphQL Koa server", "scripts": { "start": "PORT=3000 node src/index.js" }, "dependencies": { "@graphql-tools/load": "^7.7.5", "@graphql-tools/url-loader": "^7.14.1", } } Let’s setup right away the new schema. There is pretty much the same but a bit different data in the schema. JavaScript const { gql } = require('apollo-server-koa'); module.exports = gql` type Query { getFakeDataExample: DataExample } type DataExample { id: ID value: String } type Mutation { updateFakeData(value: String!): DataExample! } `; And resolvers: JavaScript const fakeData = { id: 4838745, value: 'Some Random String', }; module.exports = { Query: { getFakeDataExample: () => fakeData, }, Mutation: { updateFakeData: (_, { value }) => { return { ...fakeData, value, }; }, }, }; And now, let’s take a look at the server file. You can find out that it’s relatively the same except few lines of code. First of all, we took the loadSchema in order to get the external schema by request from EXTERNAL_ENDPOINT which is our first launched server and the loader for the schema UrlLoader . The most important that we have to be sure that our schema has been loaded and the external server doesn’t throw any errors. We have to catch that situation. As you can see in the code, we got just an array of schemas. By default, we have only our own internalSchema and then, if an external server is available, we are pushing to that array externalSchema and then use the tool mergeSchemas which helps to provide merged schema right to the ApolloServer JavaScript const Koa = require('koa'); const http = require('http'); const cors = require('@koa/cors'); const { ApolloServer } = require('apollo-server-koa'); const { loadSchema } = require('@graphql-tools/load'); const { UrlLoader } = require('@graphql-tools/url-loader'); const { makeExecutableSchema, mergeSchemas } = require('@graphql-tools/schema'); const typeDefs = require('./schema'); const resolvers = require('./resolvers'); const EXTERNAL_ENDPOINT = 'http://localhost:4000/api/v1/graphql'; async function server({ typeDefs, resolvers }) { const app = new Koa(); const httpServer = http.createServer(); const internalSchema = makeExecutableSchema({ typeDefs, resolvers, }); const schemas = [internalSchema]; try { const externalSchema = await loadSchema(EXTERNAL_ENDPOINT, { loaders: [new UrlLoader()], }); schemas.push(externalSchema); } catch { console.warn('⚠️️ External Schema has not been loaded'); } const apolloServer = new ApolloServer({ introspection: true, schema: mergeSchemas({ schemas, }), }); await apolloServer.start(); apolloServer.applyMiddleware({ app, path: '/api/v1/graphql' }); httpServer.on('request', app.callback()); await new Promise(resolve => httpServer.listen({ port: process.env.PORT }, resolve)); console.log(`Server ready at http://localhost:${process.env.PORT}${apolloServer.graphqlPath}`); return { apolloServer, app }; } server({ typeDefs, resolvers }).then(({ app }) => { app.use(cors()); }); Install all packages and run the server, which will be available on the PORT=3000 . Let’s go to the same interface of Apollo GraphQL, but the URL has to be with the proper PORT: http://localhost:3000/api/v1/graphql . Now if we open the Introspection Schema page, we gonna able to see merged schemas. One from external and another one from the last created server. Keep in mind that if some of your servers will get the same Field, the GraphQL server will rise the error something like this: Plain Text Error: Unable to merge GraphQL type “Query”: Field “getFakeDataExample” already defined with a different type. Declared as “DataExample”, but you tried to override with “DataExternalExample” It means that we have to be very careful in a GraphQL schema with our Fields and Type definitions in order to not get into an awkward situation when the Type or Field already exists. Conclusion Numerous organizations are adopting a microservice architecture and attempting to isolate the data logic flow. The approach outlined above is particularly useful in situations where microservices communicate with each other within a company. Specifically, when there is a primary global service with a default schema and a secondary microservice with extra fields that may be used by the client in the future, this method allows developers to manage and scale their microservices more efficiently, thereby increasing the overall performance and agility of the system. GitHub Repos https://github.com/antonkalik/boilerplate-graphql-koa-serverhttps://github.com/antonkalik/boilerplate-graphql-koa-server-external
I explained the concepts and theory behind Data Residency in a previous post. It's time to get our hands dirty and implement it in a simple demo. The Sample Architecture In the last section of the previous post, I proposed a sample architecture where location-based routing happened at two different stages: The API Gateway checks for an existing X-Country header. Depending on its value, it forwards the request to the computed upstream; If no value is found or no value matches, it forwards it to a default upstream. The application uses Apache Shardingsphre to route again, depending on the data. If the value computed by the API Gateway is correct, the flow stays "in its lane"; if not, it's routed to the correct database, but with a performance penalty as it's outside its lane. I simplified some aspects: The theory uses two API Gateway instances. For the demo, I used only one. Remember that the location isn't set client-side on the first request. It should be returned along the first response, stored, and reused by the client on subsequent calls. I didn't bother with implementing the client. I like my demos to be self-contained, so I didn't use any Cloud Provider. Here's the final component diagram: The data model is simple: We insert location-specific data on each database: SQL INSERT INTO europe.owner VALUES ('dujardin', 'fr', 'Jean Dujardin'); INSERT INTO europe.thingy VALUES (1, 'Croissant', 'dujardin'); INSERT INTO usa.owner VALUES ('wayne', 'us', 'John Wayne'); INSERT INTO usa.thingy VALUES (2, 'Lasso', 'wayne'); Finally, we develop a straightforward RESTful API to fetch thingies: GET /thingies/ GET /thingies/{id} Now that we have set the stage let's check how to implement routing at the two levels. Routing on Apache ShardingSphere Apache ShardingSphere offers two approaches: as a library inside the application, ShardingSphere-JDBC, or as a full-fledged deployable component, ShardingSphere-Proxy. You can also combine both. I chose the former because it's the easiest to set up. For a comparison between them, please check this table. The first step is to add the dependency to the POM: XML <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core</artifactId> <version>5.3.2</version> </dependency> ShardingSphere-JDBC acts as an indirection layer between the application and the data sources. We must configure the framework to use it. For Spring Boot, it looks like the following: YAML spring: datasource: driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver #1 url: jdbc:shardingsphere:absolutepath:/etc/sharding.yml #2-3 JDBC-compatible ShardingSphere driver Configuration file Opposite to what the documentation tells, the full prefix is jdbc:shardingsphere:absolutepath. I've opened a PR to fix the documentation. The next step is to configure ShardingSphere itself with the data sources: YAML dataSources: #1 europe: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: "jdbc:postgresql://dbeurope:5432/postgres?currentSchema=europe" username: postgres password: root usa: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: "jdbc:postgresql://dbusa:5432/postgres?currentSchema=usa" username: postgres password: root rules: #2 - !SHARDING tables: owner: #3 actualDataNodes: europe.owner,usa.owner #4 tableStrategy: standard: shardingColumn: country #3 shardingAlgorithmName: by_country #5 shardingAlgorithms: by_country: type: CLASS_BASED #6 props: strategy: STANDARD algorithmClassName: ch.frankel.blog.dataresidency.LocationBasedSharding #7 Define the two data sources, europe and usa Define rules. Many rules are available; we will only use sharding to split data between Europe and USA locations Sharding happens on the country column of the owner table Actual shards Algorithm to use. ShardingSphere offers a couple of algorithms out-of-the-box, which generally try to balance data equally between the sources.As we want a particular split, we define our own Set the algorithm type Reference the custom algorithm class The final step is to provide the algorithm's code: Kotlin class LocationBasedSharding : StandardShardingAlgorithm<String> { //1 override fun doSharding(targetNames: MutableCollection<String>, shardingValue: PreciseShardingValue<String>) = when (shardingValue.value) { //2 "fr" -> "europe" "us" -> "usa" else -> throw IllegalArgumentException("No sharding over ${shardingValue.value} defined") } } Inherit from StandardShardingAlgorithm, where T is the data type of the sharding column. Here, it's country Based on the sharding column's value, return the name of the data source to use With all of the above, the application will fetch thingies in the relevant data source based on the owner's country. Routing on Apache APISIX We should route as early as possible to avoid an application instance in Europe fetching US data. In our case, it translates to routing at the API Gateway stage. I'll use APISIX standalone mode for configuration. Let's define the two upstreams: YAML upstreams: - id: 1 nodes: "appeurope:8080": 1 - id: 2 nodes: "appusa:8080": 1 Now, we shall define the routes where the magic happens: YAML routes: - uri: /thingies* #1 name: Europe upstream_id: 1 vars: [["http_x-country", "==", "fr"]] #2 priority: 3 #3 - uri: /thingies* #4 name: USA upstream_id: 2 vars: [["http_x-country", "==", "us"]] priority: 2 #3 - uri: /thingies* #5 name: default upstream_id: 1 priority: 1 #3 Define the route to the Europe-located app APISIX matches the HTTP methods, the URI, and the conditions. Here, the condition is that the X-Country header has the fr value APISIX evaluates matching in priority order, starting with the highest priority. If the request doesn't match, e.g., because the header doesn't have the set value, it evaluates the next route in the priority list. Define the route to the USA-located app Define a default route The first request carries no header; APISIX forwards it to the default route, where ShardingSphere finds data in the relevant data source. Subsequent requests set the X-Country header because the response to the first request carries the information, and the client has stored it. Remember that it's outside the scope of the demo. In most cases, it's set to the correct location; hence, the request will stay "in its lane." If not, the configured routing will still find the data in the appropriate location at the cost of increased latency to fetch data in the other lane. Observing the Flow in Practice It's always a good idea to check that the design behaves as expected. We can use OpenTelemetry for this. For more information on how to set up OpenTelemetry in such an architecture, please refer to End-to-end tracing with OpenTelemetry. Note that Apache ShardingSphere supports OpenTelemetry but doesn't provide the binary agent. You need to build it from a source. I admit I was too lazy to do it. Let's start with a headerless request: Shell curl localhost:9080/thingies/1 It uses the default route defined in APISIX and returns the correct data, thanks to ShardingSphere. Now, let's set the country to fr, which is correct. Shell curl -H 'X-Country: fr' localhost:9080/thingies/1 APISIX correctly forwards the request to the Europe-located app. Finally, imagine a malicious actor changing the header to get their hands on data that are located in the US. Shell curl -H 'X-Country: us' localhost:9080/thingies/1 APISIX forwards it to the USA-located app according to the header. However, Shardingsphere still fetches data from Europe. Conclusion In the previous post, I explained the concepts behind Data Residency. In this post, I implemented it within a simple architecture, thanks to Apache APISIX and Apache ShardingSphere. The demo simplifies reality but should be an excellent foundation for building your production-grade Data Residency architecture. The complete source code for this post can be found on GitHub. To go further: Apache ShardingSphere Sharding YAML configuration How to filter route by Nginx builtin variable