Welcome to the Data Engineering category of DZone, where you will find all the information you need for AI/ML, big data, data, databases, and IoT. As you determine the first steps for new systems or reevaluate existing ones, you're going to require tools and resources to gather, store, and analyze data. The Zones within our Data Engineering category contain resources that will help you expertly navigate through the SDLC Analysis stage.
Artificial intelligence (AI) and machine learning (ML) are two fields that work together to create computer systems capable of perception, recognition, decision-making, and translation. Separately, AI is the ability for a computer system to mimic human intelligence through math and logic, and ML builds off AI by developing methods that "learn" through experience and do not require instruction. In the AI/ML Zone, you'll find resources ranging from tutorials to use cases that will help you navigate this rapidly growing field.
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
A database is a collection of structured data that is stored in a computer system, and it can be hosted on-premises or in the cloud. As databases are designed to enable easy access to data, our resources are compiled here for smooth browsing of everything you need to know from database management systems to database languages.
IoT, or the Internet of Things, is a technological field that makes it possible for users to connect devices and systems and exchange data over the internet. Through DZone's IoT resources, you'll learn about smart devices, sensors, networks, edge computing, and many other technologies — including those that are now part of the average person's daily life.
Enterprise AI
In recent years, artificial intelligence has become less of a buzzword and more of an adopted process across the enterprise. With that, there is a growing need to increase operational efficiency as customer demands arise. AI platforms have become increasingly more sophisticated, and there has become the need to establish guidelines and ownership. In DZone’s 2022 Enterprise AI Trend Report, we explore MLOps, explainability, and how to select the best AI platform for your business. We also share a tutorial on how to create a machine learning service using Spring Boot, and how to deploy AI with an event-driven platform. The goal of this Trend Report is to better inform the developer audience on practical tools and design paradigms, new technologies, and the overall operational impact of AI within the business. This is a technology space that's constantly shifting and evolving. As part of our December 2022 re-launch, we've added new articles pertaining to knowledge graphs, a solutions directory for popular AI tools, and more.
Distributed SQL Essentials
ChatGPT has taken the world by storm, and this week, OpenAI released the ChatGPT API. I’ve spent some time playing with ChatGPT in the browser, but the best way to really get on board with these new capabilities is to try building something with it. With the API available, now is that time. This was inspired by Greg Baugues’s implementation of a chatbot command line interface (CLI) in 16 lines of Python. I thought I’d start by trying to build the same chatbot but using JavaScript. (It turns out that Ricky Robinett also had this idea and published his bot code here. It’s pleasing to see how similar the implementations are!) The Code It turns out that Node.js requires a bit more code to deal with command line input than Python, so where Greg’s version was 16 lines, mine takes 31. Having built this little bot, I’m no less excited about the potential for building with this API though. Here’s the full code. I’ll explain what it is doing further down. import { createInterface } from "node:readline/promises"; import { stdin as input, stdout as output, env } from "node:process"; import { Configuration, OpenAIApi } from "openai"; const configuration = new Configuration({ apiKey: env.OPENAI_API_KEY }); const openai = new OpenAIApi(configuration); const readline = createInterface({ input, output }); const chatbotType = await readline.question( "What type of chatbot would you like to create? " ); const messages = [{ role: "system", content: chatbotType }]; let userInput = await readline.question("Say hello to your new assistant.\n\n"); while (userInput !== ".exit") { messages.push({ role: "user", content: userInput }); try { const response = await openai.createChatCompletion({ messages, model: "gpt-3.5-turbo", }); const botMessage = response.data.choices[0].message; if (botMessage) { messages.push(botMessage); userInput = await readline.question("\n" + botMessage.content + "\n\n"); } else { userInput = await readline.question("\nNo response, try asking again\n"); } } catch (error) { console.log(error.message); userInput = await readline.question("\nSomething went wrong, try asking again\n"); } } readline.close(); When you run this code, it looks like this: Let’s dig into how it works and how you can build your own. Building a Chatbot You will need an OpenAI platform account to interact with the ChatGPT API. Once you have signed up, create an API key from your account dashboard. As long as you have Node.js installed, the only other thing you’ll need is the openai Node.js module. Let’s start a Node.js project and create this CLI application. First create a directory for the project, change into it, and initialize it with npm: mkdir chatgpt-cli cd chatgpt-cli npm init --yes Install the openai module as a dependency: npm install openai Open package.json and add the key "type": "module" to the configuration, so we can build this as an ES module which will allow us to use top level await. Create a file called index.js and open it in your editor. Interacting With the OpenAI API There are two parts to the code: dealing with input and output on the command line and dealing with the OpenAI API. Let’s start by looking at how the API works. First we import two objects from the openai module, the Configuration and OpenAIApi. The Configuration class will be used to create a configuration that holds the API key, you can then use that configuration to create an OpenAIApi client. import { env } from "node:process"; import { Configuration, OpenAIApi } from "openai"; const configuration = new Configuration({ apiKey: env.OPENAI_API_KEY }); const openai = new OpenAIApi(configuration); In this case, we’ll store the API key in the environment and read it with env.OPENAI_API_KEY. To interact with the API, we now use the OpenAI client to create chat completions for us. OpenAI’s text-generating models don’t actually converse with you, but are built to take input and come up with plausible-sounding text that would follow that input, a completion. With ChatGPT, the model is configured to receive a list of messages and then come up with a completion for the conversation. Messages in this system can come from one of 3 different entities, the “system”, “user,” and “assistant.” The “assistant” is ChatGPT itself, the “user” is the person interacting, and the system allows the program (or the user, as we’ll see in this example) to provide instructions that define how the assistant behaves. Changing the system prompts for how the assistant behaves is one of the most interesting things to play around with and allows you to create different types of assistants. With our openai object configured as above, we can create messages to send to an assistant and request a response like this: const messages = [ { role: "system", content: "You are a helpful assistant" }, { role: "user", content: "Can you suggest somewhere to eat in the centre of London?" } ]; const response = await openai.createChatCompletion({ messages, model: "gpt-3.5-turbo", }); console.log(response.data.choices[0].message); // => "Of course! London is known for its diverse and delicious food scene..." As the conversation goes on, we can add the user’s questions and assistant’s responses to the messages array, which we send with each request. That gives the bot history of the conversation, the context for which it can build further answers on. To create the CLI, we just need to hook this up to user input in the terminal. Interacting With the Terminal Node.js provides the Readline module which makes it easy to receive input and write output to streams. To work with the terminal, those streams will be stdin and stdout. We can import stdin and stdout from the node:process module, renaming them to input and output to make them easier to use with Readline. We also import the createInterface function from node:readline import { createInterface } from "node:readline/promises"; import { stdin as input, stdout as output } from "node:process"; We then pass the input and output streams to createInterface and that gives us an object we can use to write to the output and read from the input, all with the question function: const readline = createInterface({ input, output }); const chatbotType = await readline.question( "What type of chatbot would you like to create? " ); The above code hooks up the input and output stream. The readline object is then used to post the question to the output and return a promise. When the user replies by writing into the terminal and pressing return, the promise resolves with the text that the user wrote. Completing the CLI With both of those parts, we can write all of the code. Create a new file called index.js and enter the code below. We start with the imports we described above: import { createInterface } from "node:readline/promises"; import { stdin as input, stdout as output, env } from "node:process"; import { Configuration, OpenAIApi } from "openai"; Then we initialize the API client and the Readline module: const configuration = new Configuration({ apiKey: env.OPENAI_API_KEY }); const openai = new OpenAIApi(configuration); const readline = createInterface({ input, output }); Next, we ask the first question of the user: “What type of chatbot would you like to create?” We will use the answer of this to create a “service” message in a new array of messages that we will continue to add to as the conversation goes on. const chatbotType = await readline.question( "What type of chatbot would you like to create? " ); const messages = [{ role: "system", content: chatbotType }]; We then prompt the user to start interacting with the chatbot and start a loop that says while the user input is not equal to the string “.exit” keep sending that input to the API. If the user enters “.exit” the program will end, like in the Node.js REPL. let userInput = await readline.question("Say hello to your new assistant.\n\n"); while (userInput !== ".exit") { // loop } readline.close(); Inside the loop, we add the userInput to the messages array as a “user” message. Then, within a try/catch block, send it to the OpenAI API. We set the model as “gpt-3.5-turbo” which is the underlying name for ChatGPT. When we get a response from the API, we get the message out of the response.data.choices array. If there is a message, we store it as an “assistant” message in the array of messages and output it to the user, waiting for their input again using readline. If there is no message in the response from the API, we alert the user and wait for further user input. Finally, if there is an error making a request to the API we catch the error, log the message and tell the user to try again. while (userInput !== ".exit") { messages.push({ role: "user", content: userInput }); try { const response = await openai.createChatCompletion({ messages, model: "gpt-3.5-turbo", }); const botMessage = response.data.choices[0].message; if (botMessage) { messages.push(botMessage); userInput = await readline.question("\n" + botMessage.content + "\n\n"); } else { userInput = await readline.question("\nNo response, try asking again\n"); } } catch (error) { console.log(error.message); userInput = await readline.question( "\nSomething went wrong, try asking again\n" ); } } Put that all together and you have your assistant. The full code is at the top of this post or on GitHub. You can now run the assistant by passing it your OpenAI API key as an environment on the command line: OPENAI_API_KEY=YOUR_API_KEY node index.js This will start your interaction with the assistant, starting with it asking what kind of assistant you want. Once you’ve declared that, you can start chatting with it. Experimenting Helps Us to Understand Personally, I’m not actually sure how useful ChatGPT is. It is clearly impressive; its ability to return text that reads as if it was written by a human is incredible. However, it returns content that is not necessarily correct, regardless of how confidently it presents that content. Experimenting with ChatGPT is the only way that we can try to understand what it is useful for, thus building a simple chatbot like this gives us grounds for that experiment. Learning that the system commands can give the bot different personalities and make it respond in different ways is very interesting. You might have heard, for example, that you can ask ChatGPT to help you with programming, but you could also specify a JSON structure and effectively use it as an API as well. But as you experiment with that, you will likely find that it should not be an information API, but more likely something you can use to understand your natural text and turn it into a JSON object. To me, this is exciting, as it means that ChatGPT could help create more natural voice assistants that can translate meaning from speech better than the existing crop that expects commands to be given in a more exact manner. I still have experimenting to do with this idea, and having this tool gives me that opportunity. This Is Just the Beginning If experimenting with this technology is the important thing for us to understand what we can build with it and what we should or should not build with it, then making it easier to experiment is the next goal. My next goal is to expand this tool so that it can save, interact with, and edit multiple assistants so that you can continue to work with them and improve them over time. In the meantime, you can check out the full code for this first assistant in GitHub, and follow the repo to keep up with improvements.
We’ve been hearing that the Internet of Things (IoT) would transform the way we live and work by connecting everyday devices to the internet for a long time now. While much of the promise of the IoT always seems to be "coming soon," the proliferation of IoT devices has already created a massive amount of data that needs to be processed, stored, and analyzed, in real-time. I’ve said for years—actually over a decade now—that if your IoT data isn’t timely, accurate, and actionable, you’re mostly wasting your time in collecting it. This is where the Apache Pinot® database comes in. Pinot is an open-source, distributed data store designed for real-time analytics. The high scalability, reliability, and low latency query response times of Pinot make it a great solution for processing massive amounts of IoT data. In this post, we will explore the benefits of using Pinot in IoT applications. IoT devices generate a massive amount of data, and traditional databases are not equipped to handle the scale and complexity. I’ve used a lot of solutions to collect, store and analyze IoT data, but Pinot is specifically designed for handling high-velocity data streams in real-time. With Pinot, IoT data can be ingested, processed, and analyzed in real-time. In addition to real-time processing, Pinot offers scalability and reliability. As the number of IoT devices and the amount of data they generate continues to grow, it becomes critical to have a system that can scale horizontally to handle the increasing load. Pinot can scale easily by adding more nodes to the cluster, and it also provides fault tolerance, ensuring that data is not lost in the event of a node failure. Some Background What Is IoT? If we’re going to talk about IoT and Pinot, it’s probably best to give at least a bit of context on what IoT actually is and is not. IoT, short for the Internet of Things, refers to a network of physical devices, vehicles, home appliances, and other items embedded with sensors, software, and network connectivity. These devices can communicate with each other and share data over the internet. The range of IoT devices is diverse, ranging from smartwatches and fitness trackers to smart home devices like thermostats and security cameras to industrial machines and city infrastructure. The IoT market is expected to grow rapidly in the coming years, with estimates suggesting that there will be over 27 billion IoT devices by 2025. The significance of IoT lies in the ability to collect and analyze data from a wide range of sources in real-time. This data can be used to gain insights, optimize processes, improve decision-making, and enhance user experiences. For example, in the healthcare industry, IoT devices can monitor vital signs and other health metrics, alerting doctors or caregivers in case of abnormal readings. In the retail industry, IoT sensors can track inventory levels and customer behavior, enabling retailers to optimize store layouts and product offerings. Some retail establishments are already using IoT devices to handle increases or decreases in customer traffic in stores. In the transportation industry, IoT devices can monitor vehicle performance and location, enabling fleet managers to improve efficiency and safety. Most modern cars are already equipped with IoT devices that can monitor and report on a wide range of vehicle metrics, including fuel consumption, tire pressure, and engine performance, and almost all over-the-road trucks are already reporting vast amounts of telemetry data to their fleet managers. What Is Apache Pinot? Pinot is an open-source distributed data store that is purpose-built for real-time analytics. Originally developed at LinkedIn, Pinot has since become an Apache Software Foundation project and is used by a growing number of companies and organizations for a variety of use cases. Pinot is designed to handle large volumes of data in real-time and provides sub-second query latencies, making it ideal for use cases that require real-time analytics, such as IoT. One of the key features of Pinot is its distributed architecture. Pinot is designed to be horizontally scalable, which means that it can handle increasing amounts of data by adding more nodes to the cluster. This distributed architecture also provides fault tolerance, which means that it can continue to function even if one or more nodes in the cluster fail. Pinot stores data in columnar format, which allows for highly efficient querying and analysis. By storing data in columns rather than rows, Pinot can quickly scan through large amounts of data and provide compute aggregations or other complex calculations required for IoT data analysis. Pinot provides support for a variety of data types, including numerical, text, JSON, and geospatial data. It allows for nested queries, which can be useful for analyzing complex IoT data sets, and an emerging feature of generalized joins will make these query options even more powerful. Overall, Pinot is a powerful tool for analyzing and managing IoT data in real time. Advantages of Using Apache Pinot With IoT When it comes to using Pinot with IoT, there are a number of use cases and scenarios where the two technologies can be effectively combined. For example, in the industrial IoT space, Pinot can be used to analyze sensor data from manufacturing equipment to optimize performance and improve efficiency. Analyzing data from industrial equipment in real-time allows for much better predictive maintenance, more efficient usage patterns, and overall better utilization of resources. If you’re going to use Pinot with IoT, the first step is to identify the data sources that will be ingested into Pinot. In reality, you’ll want to back up even further and analyze the types of insights and efficiencies you’re looking for in your deployment. Once you’ve done this, you can begin to design the kind of data you’ll want to collect in order to facilitate those insights. This can include data from sensors, gateways, and other IoT devices. Once the data sources have been identified, Pinot can be configured to ingest the data in real time, processing and analyzing it as it is received. Once you’ve begun to ingest your data into Pinot, you can query it using SQL. With your queries in place, you can start identifying patterns in sensor data that can help detect anomalies in equipment performance and track changes in environmental conditions over time. However, using Apache Pinot with IoT naturally presents data security and privacy challenges. IoT devices are often connected to sensitive systems or contain personal data, making it important to ensure that data is properly secured and protected. Organizations need to implement robust security measures to protect against unauthorized access and data breaches. Another challenge of using Pinot with IoT is the complexity of the data sets involved. IoT data can be highly complex and heterogeneous, consisting of a variety of data types and formats. This can make it difficult to analyze and extract insights from the data. Organizations need to have a clear understanding of the data they are working with and develop effective data management and analysis strategies to overcome these challenges. Despite these challenges, the benefits of using Pinot with IoT make it a powerful tool for organizations looking to leverage their IoT data. With its real-time analytics capabilities, distributed architecture, and support for complex queries, Pinot is well-suited for managing and analyzing the vast amounts of data generated by IoT devices. By implementing effective data management and security strategies, organizations can unlock the full potential of their IoT data and drive innovation and growth in their respective industries. Use Cases of Apache Pinot With IoT There are various use cases of Pinot with IoT, ranging from predictive maintenance in manufacturing to healthcare monitoring and analysis. Below are some detailed examples of how Pinot can be used in different IoT applications: Predictive maintenance in manufacturing: One of the most promising applications of Pinot in IoT is predictive maintenance in manufacturing. By collecting and analyzing real-time data from sensors and machines, Pinot can help predict when a machine is likely to fail and schedule maintenance before a breakdown occurs. This can improve equipment uptime and reduce maintenance costs. Smart city monitoring and management: Smart city applications are a rapidly expanding use case for IoT. Smart city data from sensors and devices are used to manage various aspects of city infrastructure such as traffic, parking, and waste management. Pinot can help analyze real-time data from multiple sources and provide insights that can be used to optimize city operations and improve citizen services. Real-time tracking and monitoring of vehicles: Another use case of Pinot in IoT is the monitoring and management of fleet vehicles. Pinot can be used to collect and analyze data from GPS trackers, vehicle sensors, and cameras to provide real-time insights into vehicle location, speed, and driving behavior. Combined with Smart City data such as real-time traffic insights, fleet managers can optimize routes, reroute deliveries, and optimize for external factors in real-time. This can help optimize fleet management and improve driver safety. Healthcare monitoring and analysis: Healthcare applications, where data from wearables, sensors, and medical devices can be used to monitor patients and analyze health outcomes in order to improve patient care and reduce errors. Conclusion I hope I have shown you how Pinot can provide you with a powerful toolset for managing and analyzing IoT data in real time. Its distributed architecture and fault-tolerant design make it an ideal choice for organizations looking to scale their data storage and processing capabilities as their IoT data grows. With its support for complex queries and SQL-like query language, Pinot offers a flexible and powerful platform for analyzing complex IoT data sets. As the IoT continues to grow and evolve, Pinot is well-positioned to become an increasingly important tool for managing and analyzing IoT data in real time. By embracing this technology and developing effective strategies for managing and analyzing IoT data, organizations can stay ahead of the curve and unlock new opportunities for growth and innovation. Try It Out Yourself Interested in seeing if Apache Pinot is a possible solution for you? Come join the community of users who are implementing Apache Pinot for real-time data analytics. Want to learn even more about it? Then be sure to attend the Real Time Analytics Summit in San Francisco!
Amazon Simple Storage Service (S3) is a highly scalable, durable, and secure object storage service offered by Amazon Web Services (AWS). S3 allows businesses to store and retrieve any amount of data from anywhere on the web by making use of its enterprise-level services. S3 is designed to be highly interoperable and integrates seamlessly with other Amazon Web Services (AWS) and third-party tools and technologies to process data stored in Amazon S3. One of which is Amazon EMR (Elastic MapReduce), which allows you to process large amounts of data using open-source tools such as Spark. Apache Spark is an open-source distributed computing system used for large-scale data processing. Spark is built to enable speed and supports various data sources, including the Amazon S3. Spark provides an efficient way to process large amounts of data and perform complex computations in minimal time. Memphis.dev is a next-generation alternative to traditional message brokers.A simple, robust, and durable cloud-native message broker wrapped with an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases. The common pattern of message brokers is to delete messages after passing the defined retention policy, like time/size/number of messages. Memphis offers a 2nd storage tier for longer, possibly infinite retention for stored messages. Each message that expels from the station will automatically migrate to the 2nd storage tier, which, in that case, is AWS S3. In this tutorial, you will be guided through the process of setting up a Memphis station with a 2nd storage class connected to AWS S3. An environment on AWS. Followed by creating an S3 bucket, setting up an EMR cluster, installing and configuring Apache Spark on the cluster, preparing data in S3 for processing, processing data with Apache Spark, best practices, and performance tuning. Setting Up the Environment Memphis To get started, first install Memphis. Enable AWS S3 integration via the “Memphis integration center.” 3. Create a station (topic), and choose a retention policy. Each message passing the configured retention policy will be offloaded to an S3 bucket. 4. Check the newly configured AWS S3 integration as 2nd storage class by clicking “Connect.” 5. Start producing events into your newly created Memphis station. Create an AWS S3 Bucket If you haven’t done so already, you need to create an AWS account on AWS’ official page. Next, create an S3 bucket where you can store your data. You can use the AWS Management Console, the AWS CLI, or an SDK to create a bucket. For this tutorial, you will use the AWS management console here. Click on “Create bucket.” Then proceed to create a bucket name complying with the naming convention and choose the region where you want the bucket to be located. Configure the “Object ownership” and “Block all public access” to your use case. Make sure to configure other bucket permissions to allow your Spark application to access the data. Finally, click on the “Create bucket” button to create the bucket. Setting Up an EMR Cluster With Spark Installed The Amazon Elastic MapReduce (EMR) is a web service based on Apache Hadoop that allows users to cost-effectively process vast amounts of data using big data technologies, including Apache Spark. To create an EMR cluster with Spark installed, open the EMR console here and select “Clusters” under “EMR on EC2” on the left side of the page. Click on “Create cluster” and give the cluster a descriptive name.Under “Application bundle,” select “Spark” to install it on your cluster. Scroll down to the “Cluster logs” section and select the checkbox of “Publish cluster-specific logs to Amazon S3.” This will create a prompt to enter the Amazon S3 location using the S3 bucket name you created in the previous step followed by /logs , ie., s3: //myawsbucket/logs . /logs are required by Amazon to create a new folder in your bucket where Amazon EMR can copy the log files of your cluster. Go to the “Security configuration and permissions section,” and input your EC2 key pair or go with the option to create one. Then click on the dropdown options for “Service role for Amazon EMR” and choose “AWSServiceRoleForSupport.” Choose the same dropdown option for “IAM role for instance profile.” Refresh the icon if need be to get these dropdown options. Finally, click the “Create cluster” button to launch the cluster and monitor the cluster status to validate that it’s been created. Installing and Configuring Apache Spark on EMR Cluster After successfully creating an EMR cluster the next step will be to configure Apache Spark on the EMR Cluster. The EMR clusters provide a managed environment for running Spark applications on AWS infrastructure, making it easy to launch and manage Spark clusters in the cloud. It configures Spark to work with your data and processing needs and then submits Spark jobs to the cluster to process your data. You can configure Apache Spark to the cluster with the Secure Shell (SSH) protocol. First, you need to authorize the SSH security connections to your cluster, which was set by default when you created the EMR cluster. A guide on how to authorize SSH connections can be found here. To create an SSH connection, you need to specify the EC2 key pair that you selected when creating the cluster. Next, connect to the EMR cluster using the Spark shell by connecting the primary node. You need to fetch the master public DNS of the primary node by: Navigating to the left of the AWS console. Under EMR on EC2, choose “Clusters.” Select the cluster of the public DNS name you want to get. On your OS terminal, input the following command: ssh hadoop@ec2-###-##-##-###.compute-1.amazonaws.com -i ~/mykeypair.pem Replace the ec2-###-##-##-###.compute-1.amazonaws.com with the name of your master public DNS and the ~/mykeypair.pem with the file and path name of your .pem file. A prompt message will pop up, and your response should be “yes.” Type in “exit” to close the SSH command. Preparing Data for Processing With Spark and Uploading to S3 Bucket Data processing requires preparation before uploading to present the data in a format that Spark can easily process. The format used is influenced by the type of data you have and the analysis you plan to perform. Some formats used include CSV, JSON, and Parquet. Create a new Spark session and load your data into Spark using the relevant API. For instance, use the spark.read.csv() method to read CSV files into a Spark DataFrame. Amazon EMR, a managed service for Hadoop ecosystem clusters, can be used to process data. It reduces the need to set up, tune, and maintain clusters. It also features other integrations with Amazon SageMaker, for example, to start a SageMaker model training job from a Spark pipeline in Amazon EMR. Once your data is ready, using the DataFrame.write.format(“s3”) method, you can read a CSV file from the Amazon S3 bucket into a Spark DataFrame. You should have configured your AWS credentials and have written permissions to access the S3 bucket. Indicate the S3 bucket and path where you want to save the data. For example, you can use the df. write .format( “s3” ). save ( “s3://my-bucket/path/to/data” ) method to save the data to the specified S3 bucket. Once the data is saved to the S3 bucket, you can access it from other Spark applications or tools, or you can download it for further analysis or processing. To upload the bucket, create a folder and choose the bucket you initially created. Choose the “Actions” button, and click on “Create Folder” in the drop-down items. You can now name the new folder. To upload the data files to the bucket: Select the name of the data folder. In the “Upload,” select “Files wizard,” and choose “Add Files.” Proceed with the Amazon S3 console direction to upload the files and select “Start Upload.” It’s important to consider and ensure best practices for securing your data before uploading your data to the S3 bucket. Understanding Data Formats and Schemas Data formats and schemas are two related, but completely different and important concepts, in data management. Data format refers to the organization and structure of data within the database. There are various formats to store data, ie., CSV, JSON, XML, YAML, etc. These formats define how data should be structured alongside the different types of data and applications applicable to it. While data schemas are the structure of the database itself. It defines the layout of the database and ensures data is stored appropriately. A database schema specifies the views, tables, indexes, types, and other elements. These concepts are important in analytics and the visualization of the database. Cleaning and Preprocessing Data in S3 It is essential to double-check for errors in your data before processing it. To get started, access the data folder you saved the data file in your S3 bucket, and download it to your local machine. Next, you will load the data into the data processing tool, which would be used to clean and preprocess the data. For this tutorial, the preprocessing tool used is Amazon Athena, which helps analyze unstructured and structured data stored in Amazon S3. 1. Go to the Amazon Athena in AWS Console. 2. Click on “Create” to create a new table and then “CREATE TABLE.” 3. Type in the path of your data file in the part highlighted as “LOCATION.” 4. Go along with the prompts to define the schema for the data and save the table. Now, you can run a query to validate that the data is loaded correctly and then clean and preprocess the data. For example: This query identifies the duplicates present in the data: SELECT row1, row2, COUNT(*) FROM table GROUP row, row2 HAVING COUNT(*) > 1; This example creates a new table without the duplicates: CREATE TABLE new_table AS SELECT DISTINCT * FROM table; Finally, export the cleaned data back to S3 by navigating to the S3 bucket and the folder to upload the file. Understanding the Spark Framework The Spark framework is an open-source, simple, and expressive cluster computing system that was built for rapid development. It is based on the Java programming language and serves as an alternative to other Java frameworks. The core feature of Spark is its in-memory data computing abilities, which speed up the processing of large datasets. Configuring Spark to Work With S3 To configure Spark to work with S3, begin by adding the Hadoop AWS dependency to your Spark application. Do this by adding the following line to your build file (e.g., build.sbt for Scala or pom.xml for Java): libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "3.3.1" Input the AWS access key ID and secret access key in your Spark application by setting the following configuration properties: spark.hadoop.fs.s3a.access.key <ACCESS_KEY_ID> spark.hadoop.fs.s3a.secret.key <SECRET_ACCESS_KEY> Set the following properties using the SparkConf object in your code: val conf = new SparkConf() .set("spark.hadoop.fs.s3a.access.key", "<ACCESS_KEY_ID>") .set("spark.hadoop.fs.s3a.secret.key", "<SECRET_ACCESS_KEY> Set the S3 endpoint URL in your Spark application by setting the following configuration property: spark.hadoop.fs.s3a.endpoint s3.<REGION>.amazonaws.com Replace <REGION> with the AWS region where your S3 bucket is located (e.g. us-east-1). A DNS-compatible bucket name is required to grant the S3 client in Hadoop access for the S3 requests. If your bucket name contains dots or underscores, you may need to enable path style access for the sake of the S3 client in Hadoop, which uses a virtual host style. Set the following configuration property to enable path access: spark.hadoop.fs.s3a.path.style.access true Lastly, create a Spark session with the S3 configuration by setting the spark.hadoop prefix in the Spark configuration: val spark = SparkSession.builder() .appName("MyApp") .config("spark.hadoop.fs.s3a.access.key", "<ACCESS_KEY_ID>") .config("spark.hadoop.fs.s3a.secret.key", "<SECRET_ACCESS_KEY>") .config("spark.hadoop.fs.s3a.endpoint", "s3.<REGION>.amazonaws.com") .getOrCreate() To read the data from S3 in Spark, the spark.read method will be used and then specify the S3 path to your data as the input source. An example code demonstrating how to read a CSV file from S3 into a DataFrame in Spark: val spark = SparkSession.builder() .appName("ReadDataFromS3") .getOrCreate() val df = spark.read .option("header", "true") // Specify whether the first line is the header or not .option("inferSchema", "true") // Infer the schema automatically .csv("s3a://<BUCKET_NAME>/<FILE_PATH>") In this example, replace <BUCKET_NAME> with the name of your S3 bucket and <FILE_PATH> with the path to your CSV file within the bucket. Transforming Data With Spark Transforming data with Spark typically refers to operations on data to clean, filter, aggregate, and join data. Spark makes available a rich set of APIs for data transformation, they include DataFrame, Dataset, and RDD APIs. Some of the common data transformation operations in Spark include filtering, selecting columns, aggregating data, joining data, and sorting data. Here’s one example of data transformation operations: Sorting data: This operation involves sorting data based on one or more columns. The orderBy or sort method on a DataFrame or Dataset is used to sort data based on one or more columns. For example: val sortedData = df.orderBy(col("age").desc) Finally, you may need to write the result back to S3 to store the results. Spark provides various APIs to write data to S3, such as DataFrameWriter, DatasetWriter, and RDD.saveAsTextFile. The following is a code example demonstrating how to write a DataFrame to S3 in Parquet format: val outputS3Path = "s3a://<BUCKET_NAME>/<OUTPUT_DIRECTORY>" df.write .mode(SaveMode.Overwrite) .option("compression", "snappy") .parquet(outputS3Path) Replace the input field of the <BUCKET_NAME> with the name of your S3 bucket, and <OUTPUT_DIRECTORY> with the path to the output directory in the bucket. The mode method specifies the write mode, which can be Overwrite, Append, Ignore, or ErrorIfExists. The option method can be used to specify various options for the output format, such as compression code. You can also write data to S3 in other formats, such as CSV, JSON, and Avro, by changing the output format and specifying the appropriate options. Understanding Data Partitioning in Spark In simple terms, data partitioning in Spark refers to the splitting of the dataset into smaller, more manageable portions across the cluster. The purpose of this is to optimize performance, reduce scalability, and, ultimately, improve database manageability. In Spark, data is processed in parallel on several clusters. This is made possible by Resilient Distributed Datasets (RDD), which are a collection of huge, complex data. By default, RDD is partitioned across various nodes due to their size. To perform optimally, there are ways to configure Spark to make sure jobs are executed promptly and the resources are managed effectively. Some of these include caching, memory management, data serialization, and the use of mapPartitions() over map(). Spark UI is a web-based graphical user interface that provides comprehensive information about a Spark application’s performance and resource usage. It includes several pages, such as Overview, Executors, Stages, and Tasks, that provide information about various aspects of a Spark job. Spark UI is an essential tool for monitoring and debugging Spark applications, as it helps identify performance bottlenecks, and resource constraints, and troubleshoot errors. By examining metrics, such as the number of completed tasks, duration of the job, CPU and memory usage, and shuffle data written and read, users can optimize their Spark jobs and ensure they run efficiently. Conclusion Processing your data on AWS S3 using Apache Spark is an effective and scalable way to analyze huge datasets. By utilizing the cloud-based storage and computing resources of AWS S3 and Apache Spark, users can process their data fast and effectively without having to worry about architecture management. In this tutorial, we went through setting up an S3 bucket and Apache Spark cluster on AWS EMR, configuring Spark to work with AWS S3, and writing and running Spark applications to process data. We also covered data partitioning in Spark, Spark UI, and optimizing performance in Spark.
As more companies combine Internet of Things (IoT) devices and edge computing capabilities, people are becoming increasingly curious about how they could use artificial intelligence (AI) to optimize those applications. Here are some thought-provoking possibilities. Improving IoT Sensor Inference Accuracy With Machine Learning Technology researchers are still in the early stages of investigating how to improve the performance of edge-deployed IoT sensors with machine learning. Some early applications include using sensors for image-classification tasks or those involving natural language processing. However, one example shows how people are making progress. Researchers at IMDEA Networks recognized that using IoT sensors for specific deep-learning tasks may mean the sensors cannot guarantee specific quality-of-service requirements, such as latency and inference accuracy. However, the people working on this project developed a machine learning algorithm called AMR² to help with this challenge. AMR² utilizes an edge computing infrastructure to make IoT sensor inferences more accurate while enabling faster responses and real-time analyses. Experiments suggested the algorithm improved inference accuracy by up to 40% compared to the results of basic scheduling tasks that did not use the algorithm. They found an efficient scheduling algorithm such as this one is essential for helping IoT sensors work properly when deployed at the edge. A project researcher pointed out that the AMR² algorithm could impact an execution delay if a developer used it for a service similar to Google Photos, which classifies images by the elements they include. A developer could deploy the algorithm to ensure the user does not notice such delays when using the app. Reducing Energy Usage of Connected Devices With AI at the Edge A 2023 study of chief financial officers at tech companies determined 80% expect revenue increases in the coming year. However, that’s arguably most likely to happen if employees understand customers’ needs and provide products or services accordingly. The manufacturers of many IoT devices intend for people to wear those products almost constantly. Some wearables detect if lone workers fall or become distressed or if people in physically demanding roles are becoming too tired and need to rest. In such cases, users must feel confident that their IoT devices will work reliably through their workdays and beyond. That’s one of the reasons why researchers explored how using AI at the edge could improve the energy efficiency of IoT devices deployed to study the effects of a sedentary lifestyle on health and how correct posture could improve outcomes. Any IoT device that captures data about how people live must collect data continuously, requiring few or no instances where information gathering stops because the device runs out of battery. In this case, subjects wore wireless devices powered by coin-cell batteries. Each of these gadgets had inertia sensors to collect accurate data about how much people moved throughout the day. However, the main problem was the batteries only lasted a few hours due to the large volume of data transmitted. For example, research showed a nine-channel motion sensor that reads 50 samples every second produces more than 100 MB of data daily. However, researchers recognized machine learning could enable the algorithms only to transfer critical data from edge-deployed IoT devices to smartphones or other devices that assist people in analyzing the information. They proceeded to use a pre-trained recurrent neural network and found the algorithm achieved real-time performance, improving the IoT devices’ functionality. Creating Opportunities for On-Device AI Training Edge computing advancements have opened opportunities to use smart devices in more places. For example, people have suggested deploying smart street lights that turn on and off in response to real-time traffic levels. Tech researchers and enthusiasts are also interested in the increased opportunities associated with AI training that happens directly on edge-deployed IoT devices. This approach could increase those products’ capabilities while reducing energy consumption and improving privacy. An MIT team studied the feasibility of training AI algorithms on intelligent edge devices. They tried several optimization techniques and came up with one that only required 157 KB of memory to train a machine-learning algorithm on a microcontroller. Other lightweight training methods typically require between 300-600 MB of memory, making this innovation a significant improvement. The researchers explained that any data generated for training stays on the device, reducing privacy concerns. They also suggested use cases where the training happens throughout normal use, such as if algorithms learn by what a person types on a smart keyboard. This approach had some undoubtedly impressive results. In one case, the team trained the algorithm for only 10 minutes, which was enough to allow it to detect people in images. This example shows optimization can go in both directions. Although the first two examples here focused on improving how IoT devices worked, this approach enhanced the AI training process. However, suppose developers train algorithms on IoT devices that will eventually use them to perform better. That’s a case where the approach mutually benefits AI algorithms and IoT-edge devices. How Will You Use AI to Improve How IoT-Edge Devices Work? These examples show some of the things researchers focused on when exploring how artificial intelligence could improve the functionality of IoT devices deployed at the edge. Let them provide valuable insights and inspiration about how you might get similar results. It’s almost always best to start with a clearly defined problem you want to solve. Then, start exploring how technology and innovative approaches could help meet that goal.
Real-time data beats slow data. It is that easy! But what is real-time? The term always needs to be defined when discussing a use case. Apache Kafka is the de facto standard for real-time data streaming. Kafka is good enough for almost all real-time scenarios. But dedicated proprietary software is required for niche use cases. Kafka is NOT the right choice if you need microsecond latency! This article explores the architecture of NASDAQ that combines critical stock exchange trading with low-latency streaming analytics. What Is Real-Time Data Streaming? Apache Kafka is the de facto standard for data streaming. However, every business has a different understanding of real-time data. And Kafka cannot solve every real-time problem. Hard real-time is a deterministic network with zero spikes and zero latency, which is a requirement for embedded systems using programming languages like C, C++, or Rust to implement safety-critical software like flight control systems or collaborative robots (cobots). Apache Kafka is not the right technology for safety-critical latency requirements. Soft real-time is data processing in a non-deterministic network with potential latency spikes. Data is processed in near real-time. That can be microseconds, milliseconds, seconds, or slower. Real-Time Categories for Data Streaming With Apache Kafka I typically see three kinds of real-time use cases. But even here, Apache Kafka does not fit into each category: Critical real-time: Limited set of use cases that require data processing in microseconds. Famous use case: Trading markets in financial services. This is NOT Kafka. Low-latency real-time: Fast data processing is required in tens or hundreds of milliseconds to enable specific use cases. Examples: Sensor analytics in manufacturing, end-to-end data correlation in ride-hailing between mobile apps and backends, and fraud detection in instant payments. This is Kafka. Near real-time: Fast data processing improves the business process but is not mandatory. For instance: Data ingestion (streaming ETL) into a data warehouse is better in seconds than a batch process that runs every night. This is Kafka. Note: this article focuses on Apache Kafka as it is the de facto standard for data streaming. However, the same is true for many complementary or competitive technologies, like Spark Streaming, Apache Flink, Apache Pulsar, or Redpanda. Let’s look at a concrete example of the financial services industry. NASDAQ Market Feeds vs. Real-Time Data Ruchir Vani, the Director of Software Engineering at Nasdaq, presented at the “Current 2022—the Next Generation of Kafka Summit” in Austin, Texas: Improving the Reliability of Market Data Subscription Feeds. The Nasdaq Stock Market (National Association of Securities Dealers Automated Quotations Stock Market) is an American stock exchange based in New York City. It is ranked second on the list of stock exchanges by market capitalization of shares traded, behind the New York Stock Exchange. The exchange platform is owned by Nasdaq, Inc. While most people only know the stock exchange, it is just the tip of the iceberg. Source: Nasdaq, Inc. Nasdaq Cloud Data Service Powered by Apache Kafka The Nasdaq Cloud Data Service has democratized access to financial data for companies, researchers, and educators. These downstream consumers have different requirements and SLAs. The core engine for processing the market data feeds requires sub-15 microsecond latency. This is NOT Kafka but dedicated (expensive) proprietary software. Consumers need to be co-located and use optimized applications to leverage data at that speed. Source: Nasdaq, Inc. NASDAQ, Inc. wanted to capture more market share by providing additional services on top of the critical market feed. They built a service on top called Nasdaq Data Link Streaming. Kafka powers this: Source: Nasdaq, Inc. The following architecture shows the combination. Critical real-time workloads run in the Nasdaq Data Centers. The data feeds are replicated to the public cloud for further processing and analytics. Market traders need to co-locate with the critical real-time engine. Other internal and external subscribers (like research and education consumers) consume from the cloud with low latency, in near real-time, or even in batches from the historical data store: Source: Nasdaq, Inc. Real Real-Time Is Proprietary (Expensive) Technology and Rarely Needed Real-time data beats slow data. Apache Kafka and similar technologies, like Apache Flink, Spark Streaming, Apache Pulsar, Redpanda, Amazon Kinesis, Google Pub Sub, RabbitMQ, and so on, enable low latency real-time messaging or data streaming. Apache Kafka became the de facto standard for data streaming because Kafka is good enough for almost all use cases. Most use cases do not even care if end-to-end processing takes 10ms, 100ms, or 500ms (as downstream applications are not built for that speed anyway). Niche scenarios require dedicated technology. Kafka is NOT the right choice if you need microsecond latency! The NASDAQ example showed how critical proprietary technology and low-latency data streaming work very well together. If you want to see more use cases, read my article about low-latency data streaming with Apache Kafka and cloud-native 5G infrastructure. What kind of real-time do you need in your projects? When do you need critical real-time? If you “just” need low latency, what use case is Kafka not good enough for?
Workstations, laptops, and smartphones are no longer the only web-enabled electronic devices in workplaces. The rise of wearables and the Internet of Things has added more gadgets that connect to the internet at home. This has created more conveniences but also introduced new problems. One of the problems brought about by the reliance on smart web-enabled electronic devices is the expansion of cyber threats. New devices being added to the enterprise network become new attack surfaces. They create more opportunities for threat actors to exploit vulnerabilities and launch attacks. Things get more complicated with more advanced technologies involved. For example, flaws in IoT solutions serve as leverage for cyber attackers as they gain deep access to operational technology networks. Use After Free: Common, Unnoticed IoT Security Weakness When it comes to the use of IoT devices, there is one security issue many tend to be unaware of: the Use After Free weakness. Also known as CWE-416 or Common Weakness Enumeration #416, this security weakness is relatively prevalent, but many device users are oblivious to its existence because of its highly technical nature. Use After Free is an inherent flaw in software written in C, which is one of the most common programming languages used to write apps for IoT and embedded devices. It happens when a program continues to utilize a memory pointer even when it was already freed. This continued usage of memory is problematic because it creates an opportunity for a threat actor to overwrite one of the pointers with an address to the shellcode. In turn, this allows the execution of arbitrary code. These usually happen without the knowledge of an IoT device user. It’s an IoT security issue that usually affects large industries, especially those in the healthcare, industrial, and device manufacturing fields. Sometimes, even the IT team may not be aware of it unless there are obvious manifestations, such as the crashing of a program and the harmful execution of arbitrary code. Hence, it is a must to know and understand the problem to be mindful of its possible emergence and to address it before it can cause serious problems. Unwanted Consequences If the allocated memory is freed and referenced, an undefined system behavior is created. This usually occurs because of application-level error conditions. It may also happen because the system cannot properly determine which program elements are responsible for the freeing of memory. The modules in a program may not be correctly coordinated or configured. One of the adverse consequences of the Use After Free issue is the crashing or freezing of a program. A program may crash because it tries to merge invalid data with memory blocks that have previously-freed data. These are incompatible elements an operating system cannot reconcile, creating operational anomalies. Another unwanted result of the Use After Free error is data corruption. After the memory has been freed, it can be allocated to another pointer, which means the original pointer to the freed memory is re-utilized. This reutilization points to a new location, which is not how proper memory allocation works. This faulty allocation destroys validly used memory and causes an undefined behavior in the process, and corrupts data. Moreover, the error can lead to the emergence of a write-what-where condition, wherein arbitrary values may be inserted into the freed memory spaces. If this happens, multiple function pointers may be created within heap data. An attacker may then execute arbitrary code if they manage to overwrite a function pointer with an address to shellcode within heap data. Reported Cases of the Use After Free Security Weakness CWE-416 is not just some theoretical weakness identified by “overthinking” security analysts. There have been a number of reported cases of it happening. It was not only encountered by small organizations with resource-constrained IT departments and cybersecurity teams. High-profile organizations have experienced it. In November 2022, the US National Institute of Standards and Technology reported CVE-2022-37332, a Use After Free vulnerability in the JavaScript engine of PDF reader Foxit version 12.0.1.124300. This security weakness made it possible for threat actors to generate a malicious PDF document that can trigger the reutilization of previously freed memory, which can lead to the execution of a harmful code. A similar issue was also discovered in the Windows Server VPN remote kernel in May 2022. This affected most versions of the Windows Server and Desktop operating systems not newer than Windows Server 2008 and Windows 7. It allowed bad actors to conduct DoS attacks and also initiate remote code execution against a target server. Even the iPhone operating system was also revealed to have this security weakness. In March 2022, upon the launch of iOS 15.4, security researchers revealed that Apple's latest iOS version (at that time) was exposed to the possibility of arbitrary code execution through kernel privileges. This vulnerability mainly affected GPU drivers. Additionally, WordPress published in January 2022 a patch for an 'SQL Injection through WP_Query' vulnerability, which is related to the Use After Free security issue. WordPress rated this security concern as highly severe, as it made it possible for SQL injection to happen through WordPress themes and plugins. Resolving the Weakness The conventional solution to CWE-416 is security patching. The creators of affected software release software or firmware updates to plug security holes and ensure secure processes. This method of addressing the problem may not be timely enough to prevent aggressive cybercriminals from exploiting vulnerabilities as soon as they spot them. As such, it is advisable to employ security solutions that can undertake security scans similar to what Runtime Application Self-Protection (RASP) solutions do. A deterministic method of preventing memory and code manipulation is advisable, as it provides a blanket defense against a wide range of software vulnerabilities. A solution that is embedded within the code of a program or the firmware of an Iot device performs considerably better and faster in addressing memory-related threats. It scans the software for difficult-to-spot anomalous actions that only become apparent when they are already in the process of causing problems. It can contain a problem even when a security patch is not yet available. In Conclusion Use After Free is one of the two major memory allocation-related threats affecting C code. The other one is called Double Free (CWE-415), which happens because of the improper multiple uses of the free() function. Both of which have the potential to cause serious security problems for organizations. What’s reassuring to know is that they are preventable with the right solutions and security strategies.
In this article, learn how to improve your time series analysis capability by using the QuestDB Python package to ingest Pandas DataFrames. Introduction Pandas is an open-source data analysis and data manipulation library for Python that has become an essential tool for data scientists and analysts. It provides a simple and intuitive way to manipulate data, making it a popular choice for data analysis tasks. However, while Pandas is excellent for small to medium-sized datasets, it can struggle with large datasets that exceed the available memory of the machine it is running on. This is where QuestDB excels, designed specifically for high-performance operations in such scenarios, making it the go-to solution for demanding data analysis tasks. By loading Pandas DataFrames into QuestDB, we can leverage the powerful data processing capabilities of the database, allowing you to scale your analysis and data manipulation operations to large datasets. We will learn how to load large Pandas Dataframes into QuestDB. We use yellow and green taxi trip records published by NYC Taxi and Limousine Commission as our data source. In this tutorial, we will learn how to load large Pandas Dataframes into QuestDB. We use yellow and green taxi trip records published by NYC Taxi and Limousine Commission as our data source. Prerequisites For this tutorial, it is recommended to have a basic understanding of Python and SQL. Also, you will need to have the following installed on your machine: Docker Getting the Data To Ingest Before we begin loading the data into QuestDB, we need to obtain the data we will be working with. As mentioned above, we will use the NYC TLC’s records of yellow and green taxi trips. Let’s download the data: Create a new directory called pandas-to-questdb and a data directory inside that. Edit and execute the following command in your terminal to download the parquet files: Shell curl -L -o ./data/yellow_tripdata_2022-<MONTH>.parquet https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-<MONTH>.parquet Make sure you replace <MONTH> with the zero-prefixed number of the month you wish to download (between 01 and 11, the 12th month is not available at the time of writing). Now, we have the data to ingest. It is time to try loading it using Pandas. Loading Records Into Memory You may already have noticed that the downloaded files are in Parquet format. Parquet is a columnar storage format commonly used for big data processing. They are optimized for use with modern big data processing engines and provide efficient storage and retrieval of data compared to traditional row-based storage formats like CSV and JSON. Before being able to load any data, we will set up a simulation production environment where we can easily test what happens if Pandas cannot load the Parquet files into memory. In production, we often meet situations where we have to deal with memory constraints, and this environment can reflect that. Run the following command to create a new Docker container running with a 1GiB memory limit. If the container reaches that limit, Docker will kill it, or the OS will OOM kill the process we are running: Shell docker run -it -m 1g -v "$(pwd)":/tutorial -w /tutorial --net host python:3.11.1-slim-bullseye /bin/bash Now, we have an Ubuntu-based Python 3.11 Docker container. Let’s install our requirements. Create a requirements.txt file with the content below: Plain Text pandas>=1.5.3 psycopg[binary]>=3.1.8 pyarrow>=11.0.0 questdb>=1.1.0 Now, execute pip install -r requirements.txt within the container. Pip will install the Python requirements. At this point, we have a test environment in which we can load the data. Create a new file called data_loader.py with the following content: Python # data_loader.py import pandas as pd df = pd.read_parquet("./data/yellow_tripdata_2022-01.parquet") print(df.head()) Now, execute it within the Docker container by running python data_loader.py. The program runs successfully, and we should see the following: We just loaded the taxi trip records for 2022 January! Let’s try to load more data. Replace the content of data_loader.py with the code below to load all files from the data directory and execute the program again: Python # data_loader.py import os import glob import pandas as pd records = glob.glob(os.path.join("data", "*.parquet")) df = pd.concat((pd.read_parquet(r) for r in records), ignore_index=True) print(df.head()) When executing the data_loader.py, you should get an error message: “Killed.” As you may assume, the OOM killer terminated the process. We were not able to load the dataset; therefore, we cannot work with that. We need a different approach. Ingesting to QuestDB In a new terminal window, start a QuestDB container by executing: Shell docker run --rm -it -p 8812:8812 -p 9009:9009 --net host --name questdb questdb/questdb The database is now ready to receive the data. Update the data_loader.py to ingest data into QuestDB using the questdb package that uses the InfluxDB Line Protocol (ILP) over TCP for maximum throughput. To handle large datasets, we will read files one by one and transfer their contents to QuestDB. Then, we will use QuestDB to query the data and load the results back into Pandas DataFrames. Refactor the data loader based on the above: Python # data_loader.py import os import glob import pandas as pd from questdb.ingress import Sender def main(): files = glob.glob(os.path.join("data", "*.parquet")) with Sender("127.0.0.1", 9009) as sender: for file in files: df = pd.read_parquet(file) print(f"ingesting {len(df.index)} rows from {file}") sender.dataframe(df, table_name="trips", at="tpep_pickup_datetime") if __name__ == "__main__": main() Let’s start from the beginning. The first major change you’ll notice is that we need to specify the hostname and port number in the script to run it. Then we iterate over the Parquet files and load them into memory using Pandas. After that, utilizing QuestDB’s Python client, we are ingesting to QuestDB directly from Pandas DataFrames. In the Python container, run python data_loader.py. The script will ingest one parquet file at a time. Working With Trip Data So far, we have prepared the dataset and loaded it into QuestDB. It’s time to execute some queries and load the result into DataFrames. Using the whole dataset, we want to know what was the average total amount paid by passengers grouped by the passengers. Create a new file, called query_amount.py with the following content: Python # query_amount.py import pandas as pd import psycopg QUERY = """ SELECT passenger_count, avg(total_amount) FROM 'trips' WHERE passenger_count > 0 GROUP BY passenger_count """ if __name__ == "__main__": conn = psycopg.connect( dbname="questdb", host="127.0.0.1", user="admin", password="quest", port=8812, ) df = pd.read_sql_query(QUERY, conn) print(df.head(10)) Similar to the data loader script, this script requires the host and port. In the script above, we are using the PostgreSQL Python client and connecting to QuestDB using that. In the Python container, execute Python query_amount.py: When the script finishes, you should see the average total amount paid by passengers. Interestingly, there is a huge difference in the average between passenger counts 6 and 7. The average is almost 2.5x for 7 passengers compared to 6. By further analyzing the data, it may turn out what was the possible root cause increase, but probably bound to human nature: we like to share the cost of rides if we are going on a longer trip. Summary In this tutorial, we have learned how to load large datasets into QuestDB using Pandas DataFrames. By transferring data from Pandas to QuestDB, we have taken advantage of the database’s powerful data processing capabilities, enabling us to scale our analysis and data manipulation operations to handle large datasets. The approach outlined in this tutorial is just one way to work with big data using Pandas and QuestDB. You can customize this method to suit your specific needs and continue to explore the possibilities of these powerful tools. The end goal is to make data analysis and manipulation easier and more efficient, regardless of the size of the dataset.
Companies are in continuous motion: new requirements, new data streams, and new technologies are popping up every day. When designing new data platforms supporting the needs of your company, failing to perform a complete assessment of the options available can have disastrous effects on a company’s capability to innovate and make sure its data assets are usable and reusable in the long term. Having a standard assessment methodology is an absolute must to avoid personal bias and properly evaluate the various solutions across all the needed axes. The SOFT Methodology provides a comprehensive guide of all the evaluation points to define robust and future-proof data solutions. However, the original blog doesn’t discuss a couple of important factors: why is applying a methodology like SOFT important? And, even more, what risks can we encounter if we’re not doing so? This blog aims to cover both aspects. The Why Data platforms are here to stay: the recent history of technology has told us that data decisions made now have a long-lasting effect. We commonly see a frequent rework of the front end, but radical changes in the back-end data platforms used are rare. Front-end rework can radically change the perception of a product, but when the same is done on a backend the changes are not immediately impacting the end users. Changing the product provider is nowadays quite frictionless, but porting a solution across different backend tech stacks is, despite the eternal promise, very complex and costly, both financially and time-wise. Some options exist to ease the experience, but the code compatibility and performances are never a 100% match. Furthermore, when talking about data solutions, performance consistency is key. Any change in the backend technology is therefore seen as a high-risk scenario, and most of the time refused with the statement “don’t fix what isn’t broken." The fear of change blocks both new tech adoption as well as upgrades of existing solutions. In summary, the world has plenty of examples of companies using backend data platforms chosen ages ago, sometimes with old, unsupported versions. Therefore, any data decision made today needs to be robust and age well in order to support the companies in their future data growth. Having a standard methodology helps understand the playing field, evaluate all the possible directions, and accurately compare the options. The Risks of Being (Data) Stuck Ok, you’re in the long-term game now. Swapping back-end or data pipeline solutions is not easy, therefore selecting the right one is crucial. But what problems will we face if we fail in our selection process? What are the risks of being stuck with a sub-optimal choice? Features When thinking about being stuck, it’s tempting to compare the chosen solution with the new and shiny tooling available at the moment, and their promised future features. New options and functionalities could enhance a company’s productivity, system management, integration, and remove friction at any point of the data journey. Being stuck with a suboptimal solution without a clear innovation path and without any capability to influence its direction puts the company in a potentially weak position regarding innovation. Evaluating the community and the vendors behind a certain technology could help decrease the risk of stagnating tools. It’s very important to evaluate which features and functionality is relevant/needed and define a list of “must haves” to reduce time spent on due diligence. Scaling The SOFT methodology blog post linked above touches on several directions of scaling: human, technological, business case, and financial. Hitting any of these problems could mean that the identified solution: Could not be supported by a lack of talent Could hit technical limits and prevent growth Could expose security/regulatory problems Could be perfectly fine to run on a sandbox, but financially impractical on production-size data volumes Hitting scaling limits, therefore, means that companies adopting a specific technology could be forced to either slow down growth or completely rebuild solutions starting from a different technology choice. Support and Upgrade Path Sometimes the chosen technology advances, but companies are afraid or can’t find the time/budget to upgrade to the new version. The associated risk is that the older the software version, the more complex (and risky) the upgrade path will be. In exceptional circumstances, the upgrade path could not exist, forcing a complete re-implementation of the solution. Support needs a similar discussion: staying on a very old version could mean a premium support fee in the best case or a complete lack of vendor/community help in a vast majority of the scenarios. Community and Talent The risk associated with talent shortage was already covered in the scaling chapter. New development and workload scaling heavily depend on the humans behind the tool. Moreover, not evaluating the community and talent pool behind a certain technology decision could create support problems once the chosen solution becomes mature and the first set of developers/supporters leave the company without proper replacement. The lack of a vibrant community around a data solution could rapidly decrease the talent pool, creating issues for new features, new developments, and existing support. Performance It’s impossible to know what the future will hold in terms of new technologies and integrations. But selecting a closed solution, with limited (or no) capabilities of integration forces companies to run only “at the speed of the chosen technology,” exposing companies to a risk of not being able to unleash new use cases because of technical limitations. Moreover, not paying attention to the speed of development and recovery could expose limits on the innovation and resilience fronts. Black Box When defining new data solutions, an important aspect is an ability to make data assets and related pipelines discoverable and understandable. Dealing with a black box approach means exposing companies to repeated efforts and inconsistent results which decrease the trust in the solution and open the door to misalignments in the results across departments. Overthinking The opposite risk is overthinking: the more time spent evaluating solutions, the more technologies, options, and needs will pile up, making the final decision process even longer. An inventory of the needs, timeframes, and acceptable performance is necessary to reduce the scope, take a decision, and start implementing. Conclusion When designing a data platform, it is very important to address the right questions and avoid the “risk of being stuck." The SOFT Methodology aims at providing all the important questions you should ask yourself in order to avoid pitfalls and create a robust solution. Do you feel all the risks are covered? Have a different opinion? Let me know!
In this article, I demonstrate how you can migrate a comprehensive web application from Oracle Database to YugabyteDB using the open-source data migration engine YugabyteDB Voyager. To improve the availability, scalability, and performance of their applications, many developers are migrating from traditional, single-server relational databases to distributed database clusters. Migrating to YugabyteDB allows engineers to use the well-known SQL interface, while also benefiting from the data-resiliency and performance characteristics of distributed databases. Whether you prefer reading or watching, let’s migrate a sample movie rental application to YugabyteDB. YugaRentals Application I’ve developed an application called YugaRentals, built to run on both Oracle Database and YugabyteDB. The YugaRentals application is a movie rental dashboard application that leverages the popular Sakila schema. This application displays the most recent movie rentals, plus a list of rental sales by film category. Let’s start by deploying and connecting to an Amazon RDS for an Oracle instance. Later, we’ll migrate our data to a multi-node YugabyteDB Managed cluster. Getting Started With Oracle Database We could run Oracle Database in the Oracle Cloud, or locally using Docker or a platform-specific installation. In this case, I’m going to demonstrate how to migrate a database hosted on Amazon RDS to YugabyteDB Managed. Setting Up Amazon RDS for Oracle I’ve deployed an instance on Amazon RDS for Oracle and configured it to be accessible from the machine running the YugaRentals application. Connecting YugaRentals to Oracle Database in Node.js Connecting to our Oracle Database instance in the cloud from Node.js requires some additional configuration. The node-oracledb package can be used to connect to our database. This package requires the installation of Oracle Client libraries for your platform. This quick start guide is helpful for first-time users. After successfully installing the npm package for node-oracledb and the client libraries for Oracle Database v19, we can use the following code snippet to connect to our instance. JavaScript // index.js const oracledb = require("oracledb"); oracledb.initOracleClient({ libDir: process.env.ORACLE_CLIENT_LIB_PATH }); let connection = await oracledb.getConnection({ user: process.env.DB_USER, password: process.env.DB_PASSWORD, connectionString: `${process.env.DB_HOST}/${process.env.DB_NAME}` }); console.log("Successfully connected to Oracle Database"); By setting environment variables with our instance configuration, and running the application, we can confirm our database is configured properly. Shell > DB_USER=admin \ DB_PASSWORD=foobar \ DB_HOST=[AMAZON_RDS_HOST] \ DB_NAME=ORCL \ ORACLE_CLIENT_LIB_PATH=/path/to/client/lib \ node index.js Server running on port 8000 Shell > DB_USER=admin \ DB_PASSWORD=foobar \ DB_HOST=[AMAZON_RDS_HOST] \ DB_NAME=ORCL \ ORACLE_CLIENT_LIB_PATH=/path/to/client/lib \ node index.js Server running on port 8000 Successfully connected to Oracle Database After verifying our Oracle Database running in the cloud, we can begin to migrate to YugabyteDB Managed. Setting Up YugabyteDB Managed It takes less than five minutes to get started with YugabyteDB Managed. First, create an account; then follow the steps to create a YugabyteDB cluster. I've chosen to deploy a three-node cluster to AWS, in the us-west-2 region. This configuration will provide fault tolerance across availability zones. Add your IP address to the cluster allow list so that you can connect from your machine to the remote database, then download the database credentials before creating your cluster. Once our cluster has been deployed, we’re ready to begin migrating with YugabyteDB Voyager. Migrating to YugabyteDB With YugabyteDB Voyager Having verified our Oracle Database deployment, it's time to migrate from Amazon RDS for Oracle, to YugabyteDB using the YugabyteDB Voyager CLI. YugabyteDB Voyager is a powerful, open-source, data-migration engine, which manages the entire lifecycle of data migration. Prepare the Databases After installing YugabyteDB Voyager, we begin by creating users in our source and target databases and granting them roles. This is how you could use the previously-installed Oracle Client library SQL*Plus. Shell > sqlplus 'admin@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=AMAZON_RDS_HOST)(PORT=1521))(CONNECT_DATA=(SID=ORCL)))' // SQL Shell SQL > 1. Create a role with read access privileges. SQL CREATE ROLE ADMIN_reader_role; BEGIN FOR R IN (SELECT owner, object_name FROM all_objects WHERE owner='ADMIN' and object_type in ('VIEW','SEQUENCE','TABLE PARTITION','TABLE','SYNONYM','MATERIALIZED VIEW')) LOOP EXECUTE IMMEDIATE 'grant select on '||R.owner||'."'||R.object_name||'" to ADMIN_reader_role'; END LOOP; END; / BEGIN FOR R IN (SELECT owner, object_name FROM all_objects WHERE owner='ADMIN' and object_type = 'TYPE') LOOP EXECUTE IMMEDIATE 'grant execute on '||R.owner||'."'||R.object_name||'" to ADMIN_reader_role'; END LOOP; END; / GRANT SELECT_CATALOG_ROLE TO ADMIN_reader_role; GRANT SELECT ANY DICTIONARY TO ADMIN_reader_role; GRANT SELECT ON SYS.ARGUMENT$ TO ADMIN_reader_role; 2. Create a user and grant roles. SQL CREATE USER ybvoyager IDENTIFIED BY password; GRANT CONNECT TO ybvoyager; GRANT ADMIN_reader_role TO ybvoyager; 3. Repeat this process for the YugabyteDB Managed instance by connecting via the Cloud Shell: SQL // Optionally, you can create a database for import. Otherwise, the target database will default to 'yugabyte'. yugabyte=> CREATE USER ybvoyager PASSWORD 'password'; yugabyte=> GRANT yb_superuser TO ybvoyager; Export the Schema Our source and target databases are now equipped to use Voyager. 1. Create an export directory and an associated environment variable. It will house schema and data files, as well as logs, metadata, and schema analysis reports. Shell > mkdir ~/export-dir > export EXPORT_DIR=$HOME/export-dir 2. Export the schema from Oracle Database. Shell > yb-voyager export schema --export-dir ~/export-dir \ --source-db-type oracle \ --source-db-host ORACLEDB_HOST \ --source-db-user ybvoyager \ --source-db-password password \ --source-db-name ORCL \ --source-db-schema ADMIN ... export of schema for source type as 'oracle' oracle version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production exporting TYPE done exporting SEQUENCE done exporting TABLE done exporting PARTITION done exporting PACKAGE done exporting VIEW done exporting TRIGGER done exporting FUNCTION done exporting PROCEDURE done exporting MVIEW done exporting SYNONYM done Exported schema files created under directory: /export-dir/schema 3. Analyze the exported schema. Shell > yb-voyager analyze-schema --export-dir ~/export-dir --output-format txt -- find schema analysis report at: /export-dir/reports/report.txt By analyzing our schema before exporting data, we have the option to make any necessary changes to our DDL statements. The schema analysis report will flag any statements which require manual intervention. The generated schema analysis report shows that the exported schema can be migrated without any changes. Good news! However, this is a chance to make schema optimizations based on our knowledge of the differences between PostgreSQL and Oracle Database. Optimize Schema Oracle Database uses triggers with sequences to set values for auto-incrementing columns if no value is supplied. In PostgreSQL, we can access the sequence to set a default directly in the table definition, like so: SQL CREATE TABLE city ( city_id INT DEFAULT nextval('city_sequence') NOT NULL ... ) After optimizing the table definition, we can edit the associated trigger. SQL //NEW CREATE OR REPLACE FUNCTION trigger_fct_city_before_trigger() RETURNS trigger AS $BODY$ BEGIN NEW.last_update:=current_date; RETURN NEW; END $BODY$ LANGUAGE 'plpgsql'; //OLD CREATE OR REPLACE FUNCTION trigger_fct_city_before_trigger() RETURNS trigger AS $BODY$ BEGIN IF (NEW.city_id IS NULL) THEN SELECT nextval('city_sequence') INTO STRICT NEW.city_id ; END IF; NEW.last_update:=current_date; RETURN NEW; END $BODY$ LANGUAGE 'plpgsql'; These optimizations can be applied to many of our tables. I’ve created a shell script, which can be executed to make these schema changes automatically. Here’s how you can execute it from the command line: Shell > ./optimize_schema.sh export_dir=$HOME/export-dir Export Data Now that we’ve optimized our schema, it’s time to export our data. Export the data from Oracle Database. Shell > yb-voyager export data --export-dir ~/export-dir \ --source-db-type oracle \ --source-db-host ORACLEDB_HOST \ --source-db-user ybvoyager \ --source-db-password password \ --source-db-name ORCL \ --source-db-schema ADMIN export of data for source type as 'oracle' Num tables to export: 17 table list for data export: [ACTOR ADDRESS CATEGORY CITY COUNTRY CUSTOMER FILM FILM_ACTOR FILM_CATEGORY FILM_TEXT INVENTORY LANGUAGE PAYMENT RENTAL STAFF STORE TODOITEM] calculating approx num of rows to export for each table... Initiating data export. Data export started. Exported tables:- {ACTOR, ADDRESS, CATEGORY, CITY, COUNTRY, CUSTOMER, FILM_ACTOR, FILM_CATEGORY, FILM_TEXT, INVENTORY, LANGUAGE, STAFF, PAYMENT, STORE, RENTAL, TODOITEM, FILM} TABLE ROW COUNT ACTOR 200 ADDRESS 603 CATEGORY 16 CITY 600 COUNTRY 109 CUSTOMER 599 FILM 1000 FILM_ACTOR 5462 FILM_CATEGORY 1000 FILM_TEXT 0 INVENTORY 4581 LANGUAGE 6 PAYMENT 16049 RENTAL 16049 STAFF 2 STORE 2 Export of data complete ✅ After successfully exporting our schema and data, we're ready to move our database to YugabyteDB Managed. Import Schema and Data to YugabyteDB 1. Import the schema to YugabyteDB Managed. As you can see from the terminal output, I’ve chosen to import to the admin schema in the default yugabyte database. Shell > yb-voyager import schema --export-dir ~/export-dir \ --target-db-host YUGABYTEDB_MANAGED_HOST \ --target-db-user ybvoyager \ --target-db-password password \ --target-db-schema admin \ --target-ssl-mode require 2. Import the data to YugabyteDB Managed. YugabyteDB Voyager makes quick work of this data import with the use of parallel jobs. Shell > yb-voyager import data --export-dir ~/export-dir \ --target-db-host YUGABYTEDB_MANAGED_HOST \ --target-db-user ybvoyager \ --target-db-password password \ --target-db-schema admin \ --target-ssl-mode require import of data in "yugabyte" database started Using 2 parallel jobs by default. Use --parallel-jobs to specify a custom value Preparing to import the tables: [ACTOR ADDRESS CATEGORY CITY COUNTRY CUSTOMER FILM FILM_ACTOR FILM_CATEGORY FILM_TEXT INVENTORY LANGUAGE PAYMENT RENTAL STAFF STORE] All the tables are imported 3. Import indexes and triggers. Shell > yb-voyager import schema --export-dir ~/export-dir \ --target-db-host YUGABYTEDB_MANAGED_HOST \ --target-db-user ybvoyager \ --target-db-password password \ --target-db-schema admin \ --target-ssl-mode require \ --post-import-data INDEXES_table.sql: CREATE INDEX actor_last_name ON actor (last_name); INDEXES_table.sql: CREATE INDEX address_city_id ON address (city_id); INDEXES_table.sql: CREATE INDEX city_country_id ON city (country_id); ... 4. Change ownership of the imported objects. We no longer need theybvoyageruser in YugabyteDB Managed. In the YugabyteDB Managed Cloud Shell, run: SQL > REASSIGN OWNED BY ybvoyager TO admin; > DROP OWNED BY ybvoyager; > DROP USER ybvoyager; It's time to verify that our database was successfully migrated to YugabyteDB Managed, by changing the connection settings in the YugaRentals application. Connecting YugaRentals to YugabyteDB Managed in Node.js YugaRentals was originally developed to run on Oracle Database, but I also added support for PostgreSQL. Since YugabyteDB is PostgreSQL-compatible, we can use the "node-postgres" driver for Node.js to connect to our YugabyteDB Managed cluster. In fact, Yugabyte has developed its own smart drivers, which add load-balancing capabilities to native drivers. This can improve performance by avoiding excessive load on any single cluster node. After installing the YugabyteDB node-postgres smart driver, we're ready to connect to our database. Shell > DB_TYPE=yugabyte \ DB_USER=admin \ DB_HOST=YUGABYTEDB_MANAGED_HOST \ DB_PASSWORD=adminpassword \ DB_NAME=yugabyte \ SSL_MODE=true \ node index.js Server running on port 8000 Successfully connected to YugabyteDB The YugaRentals application functions just the same as before. We can see the latest rentals, which were originally stored in Oracle Database. By running the simulation to generate new rentals, we can verify that our migration to YugabyteDB Managed was successful! Wrapping Up As you can see, YugabyteDB Voyager simplifies the migration from Oracle Database to YugabyteDB. Whether you’re migrating from Oracle or another relational database like PostgreSQL or MySQL, I encourage you to give it a try in your next project. Look out for more articles from me on distributed SQL and Node.js. In the meantime, keep on coding!
Elasticsearch is a highly scalable, open-source search engine and analytics platform designed to handle large amounts of data. It is built on top of Apache Lucene, a high-performance text search engine, and provides a distributed and easy-to-use solution for storing, searching, and analyzing large volumes of data. In this article, we will explore the use of Elasticsearch and its key features, including indexing, searching, and aggregations. Indexing One of the most important features of Elasticsearch is its ability to index data. The indexing API is simple to use and accepts JSON documents, which are then stored in an index. An index is a collection of documents that share similar characteristics, and can be thought of as a table in a relational database. For example, you can create an index for customer information, another for product information, and so on. Example To index a document into Elasticsearch, you can use the following command: JSON PUT /customer/doc/1 { "first_name": "John", "last_name": "Doe", "age": 35, "email": "john.doe@example.com" } Searching Another important feature of Elasticsearch is its ability to search data. The search API is rich and flexible, allowing you to search for documents based on full-text, keyword, and numeric fields. You can also apply filters, facets, and aggregations to your search queries to get more advanced results. Example To search for all documents that contain the word “John” in the first_name field, you can use the following command:bash: JSON GET/customer/_search { "query": { "match": { "first_name": "John" } } } Aggregations In addition to searching, Elasticsearch provides a powerful aggregation framework that enables you to perform complex data analysis. Aggregations can be used to calculate statistics, such as the average, sum, or count of values, or perform complex operations, such as finding the most frequently used words in a set of documents. Example To find the average age of all customers, you can use the following command: JSON GET/customer/_search { "size": 0, "aggs": { "avg_age": { "avg": { "field": "age" } } } } Complex Search Use Cases Geo-Spatial Search Elasticsearch provides support for geo-spatial search, enabling you to search for documents based on their geographic location. Example You can search for all customers located within a certain distance from a given location: JSON GET/customer/_search { "query": { "bool": { "must": { "match_all": { } }, "filter": { "geo_distance": { "distance": "10km", "location": { "lat": 40.748817, "lon": -73.985428 } } } } } } Faceted Search Faceted search is a popular search paradigm that enables users to filter search results based on specific criteria. In Elasticsearch, you can use the aggregation framework to perform a faceted search, which allows you to group your data into categories and then calculate statistics for each category. Example Suppose you have an e-commerce website that sells books, and you want to allow users to filter books by category and price range. You can use the following command to perform a faceted search that returns the number of books in each category and price range: JSON GET/books/_search { "size": 0, "aggs": { "categories": { "terms": { "field": "category" } }, "price_ranges": { "range": { "field": "price", "ranges": [ { "to": 50 }, { "from": 50, "to": 100 }, { "from": 100 } ] } } } } Multifield Search In some cases, you may want to search multiple fields at once. Example You may want to search for books that match the author’s name or the title. In Elasticsearch, you can use the multi-match query to search multiple fields at once: JSON GET/books/_search { "query": { "multi_match": { "query": "The Great Gatsby", "fields": [ "title", "author" ] } } } Nested Objects Search In Elasticsearch, you can store nested objects within a document. Example You can store multiple authors for a book or multiple addresses for a customer. To search for documents that contain specific nested objects, you can use the nested query: JSON GET/books/_search { "query": { "nested": { "path": "authors", "query": { "match": { "authors.name": "F. Scott Fitzgerald" } } } } } Conclusion Elasticsearch is a powerful tool for managing, storing, and analyzing large volumes of data. Its rich API and aggregation framework make it possible to perform complex data analysis, including full-text search, faceted search, and geo-spatial search. Whether you are building a search engine, an e-commerce platform, or a data analytics application, Elasticsearch provides a scalable and flexible solution for your needs.