Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
Whenever tech companies change their business focus, they face significant technology challenges linked to agility, compliance, maintainability, scalability, and additional software quality issues. These problems are only amplified when a company experiences hypergrowth. The good news is that application modernization can serve as a countermeasure to these challenges while preventing a major rewrite of the system (or at least parts of the system) by improving the architecture and keeping architectural and design technical debt in check. For these reasons, the use of application modernization is quickly growing in popularity. Konveyor's May 2022 report revealed around 54% of businesses have plans to adopt the process within a year. Missing deadlines, longer cycle times to add new features, inability to scale, and higher defect slippage – often lead to discussions around the need for application modernization. Legacy technologies, badly designed modules, poor architecture choices, and poor code quality are typically the reasons for these challenges. Re-architecture, rewriting modules, and application modernization are the recommended solutions. But an application modernization initiative should not be made without serious consideration. In some cases, a focused effort on tech debt reduction for 3-4 months can solve pressing challenges or rewriting a few modules over 6-9 months might be all that's necessary. However, application modernization is the only approach that will work in other cases. How do you determine whether application modernization is the best route? A solid business case will ensure you spend resources on the right move. But what data points do you need to prepare such a business case? Most of the frameworks for application modernization have two common steps: 1. Tech Due Diligence: This involves evaluating the application's architecture, technology stack, and business value. The goal is to determine whether the application can be modernized or whether it should be replaced entirely. 2. Modernization Roadmap: This involves creating a detailed plan that outlines the approach, timelines, and resources required for modernization. Based on my experience creating these artifacts, below are the key details that should be considered for both. Essentials for Technical Due Diligence To get a fresh perspective, avoid including reviewers from the original team and speak to stakeholders involved in decision-making earlier to understand the past business objectives and constraints (both short-term and long-term), challenges in the current architecture, the current organizational structure, and the development processes. Your tech due diligence must cover the following: High-Level Architecture For a 360-degree view of the system, collect information about the following: Key Architecture Principles: You can get key architecture principles and decisions easily as Confluence pages or ADRs if your team follows a practice to document it (Architecture Decision Records). Tools, Technologies, and Frameworks: Along with the cloud platforms, rule engines, and web servers used, get a detailed understanding of the frameworks and libraries used in the system. Standards and Guidelines: Collate standards and guidelines followed for technologies like frontend, API, database, etc. Non-functional Aspects: Gather information on non-functional aspects like performance, security, and compliance. Component Design Most of the time, system components' designs are not documented. However, a code walkthrough for a use case can help you understand the design. Design Principals: Find out whether good design principles like SOLID, Domain Driven Design, and appropriate design patterns were used. Data Stores: List all the different data stores like RDBMS, key-value stores, and document stores used in the product, along with their purpose. Data Modelling: Each type of data store has different modeling techniques. Review them to check if the right techniques are used or not. Data Privacy: Closely observe the techniques used for data privacy and vulnerabilities. Dev Processes This step is to understand the processes followed along with execution challenges and product quality, including functional and non-functional. Processes: Clearly understand new feature development, change management, problem management, release management, etc. Execution Challenges: Teams typically know the problems they face during development, deployment, planning, delivery, decision-making, team balance, and team motivation. You should review these aspects to unearth problems in any of them. Bug Reports: The bug reports cover trends, analysis, Root Cause Analysis (RCA), and the first-time-right. Team Capability Technical leads are often unaware of an individual's technical strengths and weaknesses. To counter this challenge, you must find out which developers are highly productive, write good quality code consistently and strive to get the first time right (build user stories with minimum bugs). Infrastructure Gather data around current cost and the cost trends for the past one year along with other key metrics that the team collects like MTTR, MTTF, Uptime, Release Frequency, Rollback frequency, etc. To summarize, the Tech Due Diligence Report lists the problems/challenges the current system has by considering all aspects like high-level architecture, component design, dev processes followed, team capability, and infrastructure. Essentials for a Modernization Roadmap The roadmap helps you to de-risk the modernization. Here are the key inputs necessary to create the roadmap: Tech due diligence report. Future business objectives and constraints. Planned product roadmap (new features that need to be added to the application). Non-functional requirements that the system needs to support in the near future and beyond. Based on these inputs, develop a target architecture that meets most of your requirements. The target architecture needs to meet business goals, and you can divide the goal into two broader categories: 1. Business Sustenance: Ability of the system to: Add new features quickly at a scale where the development team size is more than 100. With the growing development team, coordination efforts required between different teams will also grow manifold. As a consequence, the cycle time will also increase. Detect and fix bugs quickly within committed SLAs. 2. Business Growth: Ability of the system to: Scale linearly with business growth. Be compliant with laws and regulations. Be always available with high reliability. Be secure Apart from these, the system must also support software quality attributes like portability, accessibility, localization, etc. The proposed technology roadmap must cover not only the technical/architectural changes but also the cultural and process changes while staying aligned with the following: Development Processes: For planning, execution, code quality, product quality, deployment, documentation, and release with a feedback loop and continuous improvement baked in. Tech Debt Management: Processes to be followed to ensure tech debt is controlled and cleared off at regular intervals. Team Structure: To support development with increased productivity and high morale. Advantages of a Data-Driven Decision By gathering and considering all these data points, you guarantee the following: Well-defined, measurable goals for modernization. Scores on key architectural areas with explanations. Gaps and risks in the current architecture/design/code. Target architecture along with Key Architecture Principles and reasons for the choices recommended. You can also create a Tech Due Diligence report to have a 360-degree view of the technology of your product on all dimensions at regular intervals. Doing this can extend your product's life and preempt scrapping the product in the long run.
I explained the concepts and theory behind Data Residency in a previous post. It's time to get our hands dirty and implement it in a simple demo. The Sample Architecture In the last section of the previous post, I proposed a sample architecture where location-based routing happened at two different stages: The API Gateway checks for an existing X-Country header. Depending on its value, it forwards the request to the computed upstream; If no value is found or no value matches, it forwards it to a default upstream. The application uses Apache Shardingsphre to route again, depending on the data. If the value computed by the API Gateway is correct, the flow stays "in its lane"; if not, it's routed to the correct database, but with a performance penalty as it's outside its lane. I simplified some aspects: The theory uses two API Gateway instances. For the demo, I used only one. Remember that the location isn't set client-side on the first request. It should be returned along the first response, stored, and reused by the client on subsequent calls. I didn't bother with implementing the client. I like my demos to be self-contained, so I didn't use any Cloud Provider. Here's the final component diagram: The data model is simple: We insert location-specific data on each database: SQL INSERT INTO europe.owner VALUES ('dujardin', 'fr', 'Jean Dujardin'); INSERT INTO europe.thingy VALUES (1, 'Croissant', 'dujardin'); INSERT INTO usa.owner VALUES ('wayne', 'us', 'John Wayne'); INSERT INTO usa.thingy VALUES (2, 'Lasso', 'wayne'); Finally, we develop a straightforward RESTful API to fetch thingies: GET /thingies/ GET /thingies/{id} Now that we have set the stage let's check how to implement routing at the two levels. Routing on Apache ShardingSphere Apache ShardingSphere offers two approaches: as a library inside the application, ShardingSphere-JDBC, or as a full-fledged deployable component, ShardingSphere-Proxy. You can also combine both. I chose the former because it's the easiest to set up. For a comparison between them, please check this table. The first step is to add the dependency to the POM: XML <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core</artifactId> <version>5.3.2</version> </dependency> ShardingSphere-JDBC acts as an indirection layer between the application and the data sources. We must configure the framework to use it. For Spring Boot, it looks like the following: YAML spring: datasource: driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver #1 url: jdbc:shardingsphere:absolutepath:/etc/sharding.yml #2-3 JDBC-compatible ShardingSphere driver Configuration file Opposite to what the documentation tells, the full prefix is jdbc:shardingsphere:absolutepath. I've opened a PR to fix the documentation. The next step is to configure ShardingSphere itself with the data sources: YAML dataSources: #1 europe: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: "jdbc:postgresql://dbeurope:5432/postgres?currentSchema=europe" username: postgres password: root usa: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: "jdbc:postgresql://dbusa:5432/postgres?currentSchema=usa" username: postgres password: root rules: #2 - !SHARDING tables: owner: #3 actualDataNodes: europe.owner,usa.owner #4 tableStrategy: standard: shardingColumn: country #3 shardingAlgorithmName: by_country #5 shardingAlgorithms: by_country: type: CLASS_BASED #6 props: strategy: STANDARD algorithmClassName: ch.frankel.blog.dataresidency.LocationBasedSharding #7 Define the two data sources, europe and usa Define rules. Many rules are available; we will only use sharding to split data between Europe and USA locations Sharding happens on the country column of the owner table Actual shards Algorithm to use. ShardingSphere offers a couple of algorithms out-of-the-box, which generally try to balance data equally between the sources.As we want a particular split, we define our own Set the algorithm type Reference the custom algorithm class The final step is to provide the algorithm's code: Kotlin class LocationBasedSharding : StandardShardingAlgorithm<String> { //1 override fun doSharding(targetNames: MutableCollection<String>, shardingValue: PreciseShardingValue<String>) = when (shardingValue.value) { //2 "fr" -> "europe" "us" -> "usa" else -> throw IllegalArgumentException("No sharding over ${shardingValue.value} defined") } } Inherit from StandardShardingAlgorithm, where T is the data type of the sharding column. Here, it's country Based on the sharding column's value, return the name of the data source to use With all of the above, the application will fetch thingies in the relevant data source based on the owner's country. Routing on Apache APISIX We should route as early as possible to avoid an application instance in Europe fetching US data. In our case, it translates to routing at the API Gateway stage. I'll use APISIX standalone mode for configuration. Let's define the two upstreams: YAML upstreams: - id: 1 nodes: "appeurope:8080": 1 - id: 2 nodes: "appusa:8080": 1 Now, we shall define the routes where the magic happens: YAML routes: - uri: /thingies* #1 name: Europe upstream_id: 1 vars: [["http_x-country", "==", "fr"]] #2 priority: 3 #3 - uri: /thingies* #4 name: USA upstream_id: 2 vars: [["http_x-country", "==", "us"]] priority: 2 #3 - uri: /thingies* #5 name: default upstream_id: 1 priority: 1 #3 Define the route to the Europe-located app APISIX matches the HTTP methods, the URI, and the conditions. Here, the condition is that the X-Country header has the fr value APISIX evaluates matching in priority order, starting with the highest priority. If the request doesn't match, e.g., because the header doesn't have the set value, it evaluates the next route in the priority list. Define the route to the USA-located app Define a default route The first request carries no header; APISIX forwards it to the default route, where ShardingSphere finds data in the relevant data source. Subsequent requests set the X-Country header because the response to the first request carries the information, and the client has stored it. Remember that it's outside the scope of the demo. In most cases, it's set to the correct location; hence, the request will stay "in its lane." If not, the configured routing will still find the data in the appropriate location at the cost of increased latency to fetch data in the other lane. Observing the Flow in Practice It's always a good idea to check that the design behaves as expected. We can use OpenTelemetry for this. For more information on how to set up OpenTelemetry in such an architecture, please refer to End-to-end tracing with OpenTelemetry. Note that Apache ShardingSphere supports OpenTelemetry but doesn't provide the binary agent. You need to build it from a source. I admit I was too lazy to do it. Let's start with a headerless request: Shell curl localhost:9080/thingies/1 It uses the default route defined in APISIX and returns the correct data, thanks to ShardingSphere. Now, let's set the country to fr, which is correct. Shell curl -H 'X-Country: fr' localhost:9080/thingies/1 APISIX correctly forwards the request to the Europe-located app. Finally, imagine a malicious actor changing the header to get their hands on data that are located in the US. Shell curl -H 'X-Country: us' localhost:9080/thingies/1 APISIX forwards it to the USA-located app according to the header. However, Shardingsphere still fetches data from Europe. Conclusion In the previous post, I explained the concepts behind Data Residency. In this post, I implemented it within a simple architecture, thanks to Apache APISIX and Apache ShardingSphere. The demo simplifies reality but should be an excellent foundation for building your production-grade Data Residency architecture. The complete source code for this post can be found on GitHub. To go further: Apache ShardingSphere Sharding YAML configuration How to filter route by Nginx builtin variable
Data freshness, sometimes referred to as data timeliness, is the frequency with which data is updated for consumption. It is an important dimension of data quality because recently refreshed data is more accurate and, thus, more valuable. Since it is impractical and expensive to have all data refreshed on a near real-time basis, data engineers ingest and process most analytical data in batches with pipelines designed to update specific data sets at a similar frequency in which they are consumed. Red Ventures director of data engineering, Brandon Beidel, talked to us about this process saying: “We [would] start diving deep into discussions around data quality and how it impacted their day-to-day. I would always frame the conversation in simple business terms and focus on the who, what, when, where, and why. I’d especially ask questions probing the constraints on data freshness, which I’ve found to be particularly important to business stakeholders.” For example, a customer churn dashboard for a B2B SaaS company may only need to be updated once every seven days for a weekly meeting, whereas a marketing dashboard may require daily updates in order for the team to optimize its digital campaigns. Data freshness is important because the value of data decreases exponentially over time. The consequences of ignoring data freshness can be severe. One e-commerce platform lost around $5 million in revenue because their machine learning model that identified out-of-stock items and recommended substitutions was operating on thousands of temporary tables and stale data for six months. How To Measure Data Freshness for Data Quality As previously mentioned, the required level of data freshness is completely contextual to the use case. One way data teams measure data freshness is by the number of complaints they receive from their data consumers over a period of time. While this is a customer-focused approach, it is reactive and has serious disadvantages such as: Corroding data trust; Delaying decision-making and the pace of business operations; Requiring a human in the loop that is familiar with the data (not always the case when powering machine learning models); and If data is external and customer-facing, it creates a risk of churn. A better measurement is the data downtime formula (above), which more comprehensively measures the amount of time the data was inaccurate, missing, or otherwise erroneous. A proactive approach for measuring data freshness is to create service level agreements or SLAs for specific data pipelines. We’ve written a step-by-step guide for creating data SLAs, but in summary: Identify your most important data tables based on the number of reads/writes or their monetary impact on the business. Identify the business owners of those data assets. In other words, who will be most impacted by data freshness or other data quality issues? Ask them how they use their data and how frequently they access it. Create an SLA that specifies how frequently and when the data asset will be refreshed. Implement a means of monitoring when the SLA has been breached and measure how frequently the SLA has been met over a period of time. This can be done through data testing or by using a data observability platform. The end result should look something like, “The customer_360 dashboard met its daily data freshness SLA 99.5% of the time over the last 30 days, a 1% increase over the previous 30 days.” Data Freshness Challenges Data teams face numerous challenges in their data freshness quest as a result of the scale, speed, and complexity of data and data pipelines. Here are a few examples: Data sources are constantly changing: Whether internal or external, data engineers are rarely in control of the source emitting the desired data. Changes in schedule or schema can break data pipelines and create data freshness issues. Data consumption patterns change a lot, too: Strategies are adapted, metrics evolve, and departments are reorganized. Without capabilities such as data lineage, it can be difficult to understand what is a key asset (or upstream of an important data product in the context of a data mesh) and what is obsolete clutter. Outside of the smallest companies, identifying relevant data consumers and business stakeholders for each asset is also extremely challenging. This creates a communication chasm between the data and business teams. Data pipelines have a lot of failure points: The more complex moving parts a machine has, the more opportunities for it to break. Data platforms are no exception. The ingestion connector could break, the orchestration job could fail, or the transformation model could be updated incorrectly. Fixing data freshness issues takes a long time: Because there are so many moving parts, troubleshooting data freshness incidents can take data engineers hours–even days. The root cause could be hidden in endless blocks of SQL code, a result of system permission issues, or just a simple data entry error. Data Freshness Best Practices Once you have talked with your key data consumers and determined your data freshness goals or SLAs, there are a few best practices you can leverage to provide the best service or data product possible. The first step is to architect your data pipeline so that the goal is technically feasible (low latency). This is typically a data ingestion decision between batch, micro-batch, or stream processing. However, this could impact any decisions regarding complex transformation models or other data dependencies as well. At this point, you will want to consider layering approaches for detecting, resolving, and preventing data freshness issues. Let’s look at each in turn. Detecting Data Freshness Issues One of the simplest ways to start detecting data freshness issues is to write a data freshness check (test) using SQL rules. For example, let’s assume you are using Snowflake as your data warehouse and have integrated with Notification Services. You could schedule the following query as a Snowflake task which would alert you Monday through Friday at 8:00 am EST when no rows had been added to “your_table” once you have specified the “date_column” with a column that contains the timestamp when the row was added. SQL CREATE TASK your_task_name WAREHOUSE = your_warehouse_name SCHEDULE = 'USING CRON 0 8 * * 1-5 America/New_York' TIMESTAMP_INPUT_FORMAT = 'YYYY-MM-DD HH24:MI:SS' AS SELECT CASE WHEN COUNT(*) = 0 THEN SYSTEM$SEND_SNS_MESSAGE( 'your_integration_name', 'your_sns_topic_arn', 'No rows added in more than one day in your_table!' ) ELSE 'Rows added within the last day.' END AS alert_message FROM your_table WHERE date_column < DATEADD(DAY, -1, CURRENT_DATE()); The query above looks at rows added, but you could instead use a similar statement to make sure there is at least something matching the current date. Of course, both of these simple checks can be prone to error. SQL CREATE TASK your_task_name WAREHOUSE = your_warehouse_name SCHEDULE = 'USING CRON 0 8 * * 1-5 America/New_York' TIMESTAMP_INPUT_FORMAT = 'YYYY-MM-DD HH24:MI:SS' AS SELECT CASE WHEN DATEDIFF (DAY, max(last_modified), current_timestamp()) > 0 THEN SYSTEM$SEND_SNS_MESSAGE( 'your_integration_name', 'your_sns_topic_arn', 'No rows added in more than one day in your_table!' ) ELSE 'Max modified date within the last day.' END AS alert_message FROM your_table; You could also use a dbt source freshness block: SQL sources: - name: your_source_name database: your_database schema: your_schema tables: - name: your_table freshness: warn_after: count: 1 period: day loaded_at_field: date_column These are great tools and tactics to use on your most important tables, but what about the tables upstream from your most important tables? Or what if you don’t know the exact threshold? What about important tables you are unaware of or failed to anticipate a freshness check was needed? The truth is data freshness checks don’t work well at scale (more than 50 tables or so). One of the benefits of a data observability platform with data lineage is that if there is a data freshness problem in an upstream table that then creates data freshness issues in dozens of tables downstream, you get one cohesive alert rather than disjointed pings telling you your modern data stack is on fire. Resolving Data Freshness Issues The faster you resolve data freshness incidents, the fewer data downtime and cost you incur. Solve the data freshness issue quickly enough, and it may not even count against your SLA. Unfortunately, this is the most challenging part of dealing with data freshness issues. As previously mentioned, data can break in a near-infinite amount of ways. This leaves two options. You can manually hop from tab to tab, checking out the most common system, code, and data issues. However, this takes a lot of time and doesn’t guarantee you find the root cause. Our recent survey found it took respondents an average of 15 hours to resolve data incidents once detected! A data observability platform can help teams resolve data freshness issues much quicker with capabilities such as data lineage, query change detection, correlation insights for things like empty queries, and more. Preventing Data Freshness Issues Unfortunately, bad data and data freshness issues are a fact of life for data teams. You can’t out-architect bad data. However, you can reduce the number of incidents by identifying and refactoring your problematic data pipelines. Another option, which is a bit of a double-edged data freshness sword, is data contracts. Unexpected schema changes are one of the most frequent causes (along with failed Airflow jobs) of stale data. A data contract architecture can encourage software engineers to be more aware of how service updates can break downstream data systems and facilitate how they collaborate with data engineers. However, data contracts also prevent this bad data from landing in the data warehouse in the first place, so they can be cut both ways. The Bottom Line: Make Your Data Consumers Wildly Happy With Fresh Data When you flip a light switch, you expect there to be light. When your data consumers visit a dashboard, they expect the data to be fresh–it’s a baseline expectation. Prevent those nasty emails and make your data consumers wildly happy by ensuring when they need the data, it is available and fresh. Good luck!
I have developed a small-scale application that concentrates on a straightforward business scenario of customer reviews for the purpose of illustrating various use cases. This application is implemented using the Spring Boot framework and communicates with a MySQL database via Spring Data JPA, with the code being written in Kotlin. It exposes a simple REST API featuring CRUD operations on reviews. Spoiler alert: The use cases illustrated in this article are intentionally simplistic, intended solely to showcase the integration with ShardingSphere functionalities; the discussed problems here can be solved in various ways and maybe even in better ways, so don't spend too much on thinking "why." So, without further ado, let's dive into code. Here is the main entity: Kotlin @Entity @Table(name = "reviews") data class Review( var text: String, var author: String, @Column(name = "author_telephone") var authorTelephone: String? = null, @Column(name = "author_email") var authorEmail: String? = null, @Column(name = "invoice_code") var invoiceCode: String? = null, @Column(name = "course_id") var courseId: Int? = null ) : AbstractEntity() And we have the following REST API: Kotlin @RestController @RequestMapping("/api/v1/reviews") class ReviewController(val reviewService: ReviewService) { @GetMapping("/filter", params = ["author"]) fun getReviewsByAuthor(@RequestParam("author") author: String): ResponseEntity<List<Review>> { val reviews = reviewService.findAllByAuthor(author) return ResponseEntity.ok(reviews) } @GetMapping("/filter", params = ["courseId"]) fun getReviewsByCourseId(@RequestParam("courseId") courseId: Int): ResponseEntity<List<Review>> { val reviews = reviewService.findAllByCourseId(courseId) return ResponseEntity.ok(reviews) } @GetMapping("/{id}") fun getReviewById(@PathVariable("id") id: Int): ResponseEntity<Review> { val review = reviewService.findById(id) return ResponseEntity.ok(review) } @PostMapping fun createReview(@RequestBody review: Review): ResponseEntity<Review> { val savedReview = reviewService.save(review) return ResponseEntity.status(HttpStatus.CREATED).body(savedReview) } @DeleteMapping("/{id}") fun deleteReviewById(@PathVariable("id") id: Int): ResponseEntity<Unit> { reviewService.deleteById(id) return ResponseEntity.noContent().build() } } Here is the MySQL container from docker-compose.yml: YAML mysql-master: image: 'bitnami/mysql:latest' ports: - '3306:3306' volumes: - 'mysql_master_data:/bitnami/mysql/data' - ./init.sql:/docker-entrypoint-initdb.d/init.sql environment: - MYSQL_USER=my_user - MYSQL_PASSWORD=my_password - MYSQL_DATABASE=reviews-db Note: The init.sql contains DDL for the reviews table. Now if we are to execute some HTTP requests like creating a review (all the mentioned requests are in requests.http). Plain Text POST http://localhost:8070/api/v1/reviews/ Content-Type: application/json { "text": "This is a great course!", "author": "John Doe", "authorTelephone": "555-1234", "authorEmail": "johndoe@example.com", "invoiceCode": "ABC123", "courseId": 123 } We'll observe the following query (p6spy enabled): Plain Text INFO 16784 --- [nio-8070-exec-1] p6spy : #1681730984533 | took 4ms | statement | connection 2| url jdbc:mysql://localhost:3306/reviews-db?allowPublicKeyRetrieval=true&useSSL=false insert into reviews (created_at, last_modified_at, author, author_email, author_telephone, course_id, invoice_code, text, id) values ('2023-04-17T14:29:44.450+0300', '2023-04-17T14:29:44.450+0300', 'John Doe', 'johndoe@example.com', '555-1234', 123, 'ABC123', 'This is a great course!', 2); Data Sharding The reviews application discussed above involves a database storing a large number of reviews for courses. As the number of courses and reviews grows, the reviews table in the database can become very large, making it difficult to manage and slowing down performance. To address this issue, we can implement data sharding, which involves breaking up the reviews table into smaller, more manageable pieces called shards. Each shard contains a subset of the data in the reviews table, with each shard being responsible for a specific range of data based on a shard key, such as the course id. Table sharding can help the reviews application manage its growing reviews table more effectively, improving performance and scalability while also making backups and maintenance tasks easier to manage. But, there is always a but – implementing table sharding manually can be a complex and challenging task. It requires a deep understanding of database design and architecture, as well as knowledge of the specific sharding implementation being used. There are a lot of challenges that can arise when implementing table sharding in a Spring Boot application. It is time we met ShardingSphere: Apache ShardingSphere is an ecosystem to transform any database into a distributed database system, and enhance it with sharding, elastic scaling, encryption features and more. Apache ShardingSphere comes in two flavors: ShardingSphere-JDBC is a lightweight Java framework that provides additional services at Java's JDBC layer. ShardingSphere-Proxy is a transparent database proxy, providing a database server that encapsulates database binary protocol to support heterogeneous languages. ShardingSphere provides a range of features, including data sharding, distributed transaction, read/write splitting, high availability, data migration, query federation, data encryption, and shadow database for full-link online load testing scenarios, to help manage large volumes of data and ensure data integrity and security. For now, we'll focus on ShardingSphere-JDBC's data sharding, and we'll use the following dependencies: Kotlin implementation("org.apache.shardingsphere:shardingsphere-jdbc-core:5.3.2") implementation("org.apache.shardingsphere:shardingsphere-cluster-mode-core:5.3.2") implementation("org.apache.shardingsphere:shardingsphere-cluster-mode-repository-zookeeper:5.3.2") implementation("org.apache.shardingsphere:shardingsphere-cluster-mode-repository-api:5.3.2") Note: The ShardingSphere team had starters for spring boot in 5.1.x versions, but they moved away from starters in favor of consistency in their project and now are recommending using the latest version (non-starter), which can be configured a bit differently, but still fairly simple. In my repo, through the commits, you can find examples of spring boot starter configuration too. ShardingSphere-JDBC can be configured mainly in two ways: YAML configuration and Java configuration. I picked the YAML configuration for this article. So at the moment, my datasource configuration from application.yaml looks like this: YAML spring: datasource: username: my_user password: my_password url: jdbc:mysql://localhost:3306/reviews-db?allowPublicKeyRetrieval=true&useSSL=false tomcat: validation-query: "SELECT 1" test-while-idle: true jpa: properties: hibernate: dialect: org.hibernate.dialect.MySQL8Dialect open-in-view: false hibernate: ddl-auto: none To enable ShardingSphere JDBC, we'll have to make it look like this: YAML spring: datasource: driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver url: jdbc:shardingsphere:classpath:sharding.yaml jpa: properties: hibernate: dialect: org.hibernate.dialect.MySQL8Dialect open-in-view: false hibernate: ddl-auto: none We specified that the driver being used for the data source will be the ShardingSphereDriver and the url should be picked based on this file sharding.yaml Okay, pretty simple, right? Let's continue; let's create the sharding.yaml YAML dataSources: master: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.jdbc.Driver jdbcUrl: jdbc:mysql://localhost:3306/reviews-db?allowPublicKeyRetrieval=true&useSSL=false username: my_user password: my_password connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 65 minPoolSize: 1 mode: type: Standalone repository: type: JDBC rules: - !SHARDING tables: reviews: actualDataNodes: master.reviews_$->{0..1} tableStrategy: standard: shardingColumn: course_id shardingAlgorithmName: inline shardingAlgorithms: inline: type: INLINE props: algorithm-expression: reviews_$->{course_id % 2} allow-range-query-with-inline-sharding: true props: proxy-hint-enabled: true sql-show: true Now let's analyze the most important properties: dataSources.master – here lies the definition of our master data source. mode – which can be either standalone with JDBC type or cluster with Zookeeper type (recommended for production), which is used for configuration information persistence rules– here, we can enable various ShardingSphere features like - !SHARDING tables.reviews – here, we describe the actual tables based on the inline syntax rules, meaning that we'll have two tables reviews_0 and reviews_1 sharded by the course_id column. shardingAlgorithms – here, we describe the manual inline sharding algorithm via a groovy expression telling that the reviews table is divided into two tables based on the course_id column. props – here, we enabled intercepting/formatting sql queries (p6spy can be disabled/commented). Important: Before starting our application, we need to make sure that our defined shards are created, so I created two tables in my database: reviews_0 and reviews_1 (init.sql). Now we are ready to start our application and do some requests: Plain Text ### POST a new review POST http://localhost:8070/api/v1/reviews/ Content-Type: application/json { "text": "This is a great course!", "author": "John Doe", "authorTelephone": "555-1234", "authorEmail": "johndoe@example.com", "invoiceCode": "ABC123", "courseId": 123 } We can see the following log: Plain Text INFO 35412 --- [nio-8070-exec-2] ShardingSphere-SQL: Actual SQL: master ::: insert into reviews_1 (created_at, last_modified_at, author, author_email, author_telephone, course_id, invoice_code, text, id) values (?, ?, ?, ?, ?, ?, ?, ?, ?) ::: [2023-04-17 15:42:01.8069745, 2023-04-17 15:42:01.8069745, John Doe, johndoe@example.com, 555-1234, 123, ABC123, This is a great course!, 4] If we are to execute one more request with a different payload: Plain Text INFO 35412 --- [nio-8070-exec-8] ShardingSphere-SQL: Actual SQL: master ::: insert into reviews_1 (created_at, last_modified_at, author, author_email, author_telephone, course_id, invoice_code, text, id) values (?, ?, ?, ?, ?, ?, ?, ?, ?) ::: [2023-04-17 15:43:47.3267788, 2023-04-17 15:43:47.3267788, Mike Scott, mikescott@example.com, 555-1234, 123, ABC123, This is an amazing course!, 5] We can notice that both our reviews placed by Mike and John went into reviews_1 table, what if we are to change the course_id to 124 and execute the same POST request again? Plain Text INFO 35412 --- [nio-8070-exec-4] ShardingSphere-SQL: Actual SQL: master ::: insert into reviews_0 (created_at, last_modified_at, author, author_email, author_telephone, course_id, invoice_code, text, id) values (?, ?, ?, ?, ?, ?, ?, ?, ?) ::: [2023-04-17 15:44:42.7133688, 2023-04-17 15:44:42.7133688, Mike Scott, mikescott@example.com, 555-1234, 124, ABC123, This is an amazing course!, 6] We can see that our new review got saved in reviews_0 table. Now we can execute two GET requests based on the course_id Plain Text ### GET reviews by course ID GET http://localhost:8070/api/v1/reviews/filter?courseId=123 GET http://localhost:8070/api/v1/reviews/filter?courseId=124 And observe in the logs how routing between our two tables took place. Plain Text INFO 35412 --- [nio-8070-exec-9] ShardingSphere-SQL: Actual SQL: master ::: select review0_.id as id1_0_, review0_.created_at as created_2_0_, review0_.last_modified_at as last_mod3_0_, review0_.author as author4_0_, review0_.author_email as author_e5_0_, review0_.author_telephone as author_t6_0_, review0_.course_id as course_i7_0_, review0_.invoice_code as invoice_8_0_, review0_.text as text9_0_ from reviews_1 review0_ where review0_.course_id=? ::: [123] INFO 35412 --- [nio-8070-exec-5] ShardingSphere-SQL: Actual SQL: master ::: select review0_.id as id1_0_, review0_.created_at as created_2_0_, review0_.last_modified_at as last_mod3_0_, review0_.author as author4_0_, review0_.author_email as author_e5_0_, review0_.author_telephone as author_t6_0_, review0_.course_id as course_i7_0_, review0_.invoice_code as invoice_8_0_, review0_.text as text9_0_ from reviews_0 review0_ where review0_.course_id=? ::: [124] The first select was directed to reviews_1 table and the second one to reviews_0 - Sharding in action! Read-Write Splitting Now let's imagine another problem, the reviews application time may experience high stress during peak hours, leading to slow response times and decreased user experience. To address this issue, we can implement read/write splitting to balance the load and improve performance. And how lucky we are that ShardingSphere offers us a read/write splitting solution. Read-write splitting involves directing read queries to replica databases and write queries to a master database, ensuring that read requests do not interfere with write requests and that database performance is optimized. Before configuring the read-write splitting solution, we'll have to make some changes to our docker-compose in order to have some replicas for our master db (credits to bitnami for providing this): YAML mysql-master: image: 'bitnami/mysql:latest' ports: - '3306:3306' volumes: - 'mysql_master_data:/bitnami/mysql/data' - ./init.sql:/docker-entrypoint-initdb.d/init.sql environment: - MYSQL_REPLICATION_MODE=master - MYSQL_REPLICATION_USER=repl_user - MYSQL_REPLICATION_PASSWORD=repl_password - MYSQL_ROOT_PASSWORD=master_root_password - MYSQL_USER=my_user - MYSQL_PASSWORD=my_password - MYSQL_DATABASE=reviews-db mysql-slave: image: 'bitnami/mysql:latest' ports: - '3306' depends_on: - mysql-master environment: - MYSQL_USER=my_user - MYSQL_PASSWORD=my_password - MYSQL_REPLICATION_MODE=slave - MYSQL_REPLICATION_USER=repl_user - MYSQL_REPLICATION_PASSWORD=repl_password - MYSQL_MASTER_HOST=mysql-master - MYSQL_MASTER_PORT_NUMBER=3306 - MYSQL_MASTER_ROOT_PASSWORD=master_root_password Let's start our containers like this (one master and two replicas): docker-compose up --detach --scale mysql-master=1 --scale mysql-slave=2 Now we need the mapped ports for our slaves. $ docker port infra-mysql-slave-1 3306/tcp -> 0.0.0.0:49923 $ docker port infra-mysql-slave-2 3306/tcp -> 0.0.0.0:49922 Okay, now that we have our master and its replicas in place, we are ready to configure two new dataSources like this: YAML dataSources: master: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.jdbc.Driver jdbcUrl: jdbc:mysql://localhost:3306/reviews-db?allowPublicKeyRetrieval=true&useSSL=false username: my_user password: my_password connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 65 minPoolSize: 1 slave0: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.jdbc.Driver jdbcUrl: jdbc:mysql://localhost:49922/reviews-db?allowPublicKeyRetrieval=true&useSSL=false username: my_user password: my_password connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 65 minPoolSize: 1 slave1: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: com.mysql.jdbc.Driver jdbcUrl: jdbc:mysql://localhost:49923/reviews-db?allowPublicKeyRetrieval=true&useSSL=false username: my_user password: my_password connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 65 minPoolSize: 1 And then, we can add the read-write splitting rule to the rules: YAML - !READWRITE_SPLITTING dataSources: readwrite_ds: staticStrategy: writeDataSourceName: master readDataSourceNames: - slave0 - slave1 loadBalancerName: readwrite-load-balancer loadBalancers: readwrite-load-balancer: type: ROUND_ROBIN Here I think everything is self-explanatory: we have specified the written data source name to be the master and the read data sources to point to our slaves: slave0 and slave1; and we picked a round-robin load balancer algorithm. Important: One last change to be made is regarding the sharding rule, which knows nothing about the newly configured read-write splitting rule and points directly to the master: YAML - !SHARDING tables: reviews: actualDataNodes: readwrite_ds.reviews_$->{0..1} Now our sharding we'll be wrapped by the read-write splitting rule, too, and the data source decision will be made before picking the correct table (pay attention to readwrite_ds.reviews_$->{0..1}). Okay, we can start our application, run the same POST request and observe the logs: Plain Text INFO 22860 --- [nio-8070-exec-1] ShardingSphere-SQL: Actual SQL: master ::: insert into reviews_0 (created_at, last_modified_at, author, author_email, author_telephone, course_id, invoice_code, text, id) values (?, ?, ?, ?, ?, ?, ?, ?, ?) ::: [2023-04-17 16:12:07.25473, 2023-04-17 16:12:07.25473, Mike Scott, mikescott@example.com, 555-1234, 124, ABC123, This is an amazing course!, 7] Nothing surprising here, the sharding still works, and the query took place in the master data source (write data source). But if we are to run a couple of GET requests, we'll observe the following: Plain Text INFO 22860 --- [nio-8070-exec-2] ShardingSphere-SQL: Actual SQL: slave0 ::: select review0_.id as id1_0_, review0_.created_at as created_2_0_, review0_.last_modified_at as last_mod3_0_, review0_.author as author4_0_, review0_.author_email as author_e5_0_, review0_.author_telephone as author_t6_0_, review0_.course_id as course_i7_0_, review0_.invoice_code as invoice_8_0_, review0_.text as text9_0_ from reviews_0 review0_ where review0_.course_id=? ::: [124] INFO 22860 --- [nio-8070-exec-4] ShardingSphere-SQL: Actual SQL: slave1 ::: select review0_.id as id1_0_, review0_.created_at as created_2_0_, review0_.last_modified_at as last_mod3_0_, review0_.author as author4_0_, review0_.author_email as author_e5_0_, review0_.author_telephone as author_t6_0_, review0_.course_id as course_i7_0_, review0_.invoice_code as invoice_8_0_, review0_.text as text9_0_ from reviews_0 review0_ where review0_.course_id=? ::: [124] You can observe read-write splitting in action; our write queries take place in master data sources, but our read queries take place in master's replicas (slave0 and slave1) and this while maintaining the correct sharding rule. Data Masking Off to another imaginary problem regarding our application. Imagine that sensitive information such as customer email, phone number, and invoice code may need to be accessed by certain users or applications while remaining hidden from others due to data privacy regulations. To address this issue, we could implement a data masking solution to mask sensitive data when mapping the result or at the SQL level. But as you guessed, why bother? ShardingSphere is here to save the day with another easy-to-enable feature – data masking. So let's update our configuration with this new rule: YAML - !MASK tables: reviews: columns: invoice_code: maskAlgorithm: md5_mask author_email: maskAlgorithm: mask_before_special_chars_mask author_telephone: maskAlgorithm: keep_first_n_last_m_mask maskAlgorithms: md5_mask: type: MD5 mask_before_special_chars_mask: type: MASK_BEFORE_SPECIAL_CHARS props: special-chars: '@' replace-char: '*' keep_first_n_last_m_mask: type: KEEP_FIRST_N_LAST_M props: first-n: 3 last-m: 2 replace-char: '*' Let's see what we have here: table.reviews – we defined three masking algorithms for each column mentioned before maskAlgorithms.md5_mask – we specified the MD5 algorithm type for invoice_code maskAlgorithms.mask_before_special_chars_mask – we configured the MASK_BEFORE_SPECIAL_CHARS algorithm for the author_email column, meaning that all the characters before @ symbol will be replaced with the * symbol. maskAlgorithms.keep_first_n_last_m_mask – we configured the KEEP_FIRST_N_LAST_M algorithm for author_telephone column, meaning that only the first 3 and last 2 characters of a telephone number will stay unchanged; everything in between will be masked by the * symbol. Note: You can find a lot of other masking algorithms here. All right, let's start our application and do the same POST request. Plain Text INFO 35296 --- [nio-8070-exec-1] ShardingSphere-SQL: Actual SQL: master ::: insert into reviews_0 (created_at, last_modified_at, author, author_email, author_telephone, course_id, invoice_code, text, id) values (?, ?, ?, ?, ?, ?, ?, ?, ?) ::: [2023-04-17 16:26:51.8188306, 2023-04-17 16:26:51.8188306, Mike Scott, mikescott@example.com, 555-1234, 124, ABC123, This is an amazing course!, 9] We'll see nothing new here; the exact values that we provided in the body are the ones that got in the database. You can check the master/slave db, too, for that. But the magic comes in if we want to execute our GET request, which gives us the following body. JSON [ { "text": "This is an amazing course!", "author": "Mike Scott", "authorTelephone": "555***34", "authorEmail": "*********@example.com", "invoiceCode": "bbf2dead374654cbb32a917afd236656", "courseId": 124, "id": 9, "lastModifiedAt": "2023-04-17T15:44:43" }, ] As you can see, the data stays unchanged in the database, but when queried and delivered, the telephone, email, and invoice codes got masked according to our defined algorithm in the data masking rule. Conclusion That's it for today, folks! I hope this article gave you a good understanding of how ShardingSphere can be used for database sharding, read-write splitting, and data masking with Spring Boot. We covered the basics of ShardingSphere, how to configure it for sharding, how to use read-write splitting, and how to apply data masking to sensitive data. Make sure to check out all the features that ShardingSphere provides and its amazing documentation here. All the code mentioned in this article can be found here Happy coding!
Caches are very useful software components that all engineers must know. It is a transversal component that applies to all the tech areas and architecture layers such as operating systems, data platforms, backend, frontend, and other components. In this article, we are going to describe what is a cache and explain specific use cases focusing on the frontend and client side. What Is a Cache? A cache can be defined in a basic way as an intermediate memory between the data consumer and the data producer that stores and provides the data that will be accessed many times by the same/different consumers. It is a transparent layer for the data consumer in terms of user usability except to improve performance. Usually, the reusability of data provided by the data producer is the key to taking advantage of the benefits of a cache. Performance is the other reason to use a cache system such as in-memory databases to provide a high-performance solution with low latency, high throughput, and concurrency. For example, how many people query the weather on a daily basis and how many times do they repeat the same query? Let's suppose that there are 1,000 people in New York consulting the weather and 50% repeat the same query twice per day. In this scenario, if we can store the first query as close as possible to the user's device, we achieve two benefits increase the user experience because the data is provided faster and reduce the number of queries to the data producer/server side. The output is a better user experience and a solution that will support more concurrent users using the platform. At a high level, there are two caching strategies that we can apply in a complementary way: Client/Consumer Side: The data cached is stored on the consumer or user side, usually in the browser's memory when we are talking about web solutions (also called private cache). Server/Producer Side: The data cached is stored in the components of the data producer architecture. Caches like any other solution have a series of advantages that we are going to summarize: Application performance: Provide faster response times because can serve data more quickly. Reduce load on the server side: When we apply caches to the previous system and reuse a piece of data, we are avoiding queries/requests to the following layer. Scalability and cost improvement: As data caching gets closer to the consumer, we increase the scalability and performance of the solution at a lower cost. Components closer to the client side are more scalable and cheaper because three main reasons: These components are focused on performance and availability but have poor consistency. They have only part of the information: the data used more by the users. In the case of the browser's local cache, there is no cost for the data producer. The big challenges of cache are data consistency and data freshness, which means how the data is synchronized and up-to-date across the organization. Depending on the use case, we will have more or fewer requirements restrictions because it is so different from caching images than the inventory stock or sales behavior. Client-Side Caches Speaking about the client-side cache, we can have different types of cache that we are going to analyze a little bit in this article: HTTP Caching: This caching type is an intermediate cache system, as it depends partially on the server. Cache API: This is a browser API(s) that allows us to cache requests in the browser. Custom Local Cache: The front-end app controls the cache storage, expiration, invalidation, and update. HTTP Caching It caches the HTTP requests for any resource (CSS, HTML, images, video, etc.) in the browsers, and it manages all related to storage, expiration, validation, fetch, etc., from the front end. The application’s point of view is almost transparent as it makes a request in a regular way and the browser does all the “magic." The way of controlling the caching is by using HTTP Headers, in the server side, it adds cache-specific headers to the HTTP response, for example: "Expires: Tue, 30 Jul 2023 05:30:22 GMT," then the browser knows this resource can be cached, and the next time the client (application) requests the same resource if the request time is before the expiration date the request will not be done, the browser will return the local copy of the resource. It allows you to set the way the responses are disguised, as the same URL can generate different responses (and their cache should be handled in a different way). For example, in an API endpoint that returns some data (i.e., http://example.com/my-data) we could use the request header Content-type to specify if we want the response in JSON or CSV, etc. Therefore, the cache should be stored with the response depending on the request header(s). For that, the server should set the response header Vary: Accept-Language to let the browser know the cache depends on that value. There are a lot of different headers to control the cache flow and behavior, but it is not the goal of this article to go deep into it. It will probably be addressed in another article. As we mentioned before, this caching type needs the server to set the resources expiration, validation, etc. So this is not a pure frontend caching method or type, but it’s one of the simplest ways to cache the resources the front-end application uses, and it is complementary to the other way we will mention down below. Related to this cache type, as it is an intermediate cache, we can even delegate it in a “piece” between the client and the server; for example, a CDN, a reverse proxy (for example Varnish), etc. Cache API It is quite similar to the HTTP caching method, but in this case, we control which requests are stored or extracted from the cache. We have to manage the cache expiration (and it’s not easy, because those caches were thought to live “forever”). Even if these APIs are available in the windowed contexts are very oriented to their usage in a worker context. This cache is very oriented to use for offline applications. On the first request, we can get and cache all the resources need it (images, CSS, JS, etc.), allowing the application to work offline. It is very useful in mobile applications, for example with the use of maps for our GPS systems in addition to weather data. This allows us to have all the information for our hiking route even if we have no connection to the server. One example of how it works in a windowed context: const url = ‘https://catfact.ninja/breeds’ caches.open('v1').then((cache) => { cache.match((url).then((res) => { if (res) { console.log('it is in cache') console.log(res.json()) } else { console.log('it is NOT in cache') fetch(url) .then(res => { cache.put('test', res.clone()) }) } }) }) Custom Local Cache In some cases, we will need more control over the cached data and the invalidation (not just expiration). Cache invalidation is more than just checking the max-age of a cache entry. Imagine the weather app we mentioned above. This app allows the users to update the weather to reflect the real weather in a place. The app needs to do a request per city and transform the temperature values from F to ºC (this is a simple example: calculations can be more expensive in other use cases). To avoid doing requests to the server (even if it’s cached), we can do all the requests the first time, put all the data together in a data structure convenient for us, and store it in, for example in the browser’s IndexedDB, in the LocalStorage, SessionStorage or even in memory (not recommended). The next time we want to show the data, we can get it from the cache, not just the resource data (even the computation we did), saving network and computation time. We can control the expiration of the caches by adding the issue time next to the API, and we can also control the cache invalidation. Imagine now that the user adds a new cat in its browser. We can just invalidate the cache and do the requests and calculations next time, or go further, updating our local cache with the new data. Or, another user can change the value, and the server will send an event to notify the change to all clients. For example, using WebSockets, our front-end application can hear these events and invalidate the cache or just update the cache. This kind of cache requires work on our side to check the caches and handle events that can invalidate or update it, etc., but fits very well in a hexagonal architecture where the data is consumed from the API using a port adaptor (repository) that can hear domain events to react to the changes and invalidate or update some caches. This is not a cache generic solution. We need to think if it fits our use case as it requires work on the front-end application side to invalidate the caches or to emit and handle data change events. In most cases, the HTTP caching is enough. Conclusion Having a cache solution and good strategy should be a must in any software architecture, but our solution will be incomplete and probably not optimized. Caches are our best friends mostly in high-performance scenarios. It seems that the technical invalidation cache process is the challenge, but the biggest challenge is to understand the business scenarios and uses cases to identify what are the requirements in terms of data freshness and consistency that allow us to design and choose the best strategy. We will talk about other cache approaches for databases, backend, and in-memory databases in the next articles.
Spark is an analytics engine for large-scale data engineering. Despite its long history, it still has its well-deserved place in the big data landscape. QuestDB, on the other hand, is a time-series database with a very high data ingestion rate. This means that Spark desperately needs data, a lot of it! ...and QuestDB has it, a match made in heaven. Of course, there are pandas for data analytics! The key here is the expression large-scale. Unlike pandas, Spark is a distributed system and can scale really well. What does this mean exactly? Let's take a look at how data is processed in Spark. For the purposes of this article, we only need to know that a Spark job consists of multiple tasks, and each task works on a single data partition. Tasks are executed parallel in stages, distributed on the cluster. Stages have a dependency on the previous ones; tasks from different stages cannot run in parallel. The schematic diagram below depicts an example job: In this tutorial, we will load data from a QuestDB table into a Spark application and explore the inner working of Spark to refine data loading. Finally, we will modify and save the data back to QuestDB. Loading Data to Spark First thing first, we need to load time-series data from QuestDB. I will use an existing table, trades, with just over 1.3 million rows. It contains bitcoin trades spanning over 3 days: not exactly a big data scenario but good enough to experiment. The table contains the following columns: Column Name Column Type symbol SYMBOL side SYMBOL price DOUBLE amount DOUBLE timestamp TIMESTAMP The table is partitioned by day and the timestamp column serves as the designated timestamp. QuestDB accepts connections via Postgres wire protocol, so we can use JDBC to integrate. You can choose from various languages to create Spark applications, and here we will go for Python. Create the script, sparktest.py: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() # print the number of rows print(df.count()) # do some filtering and print the first 3 rows of the data df = df.filter(df.symbol == 'BTC-USD').filter(df.side == 'buy') df.show(3, truncate=False) Believe it or not, this tiny application already reads data from the database when submitted as a Spark job. Shell spark-submit --jars postgresql-42.5.1.jar sparktest.py The job prints the following row count: Shell 1322570 And these are the first 3 rows of the filtered table: Shell +-------+----+--------+---------+--------------------------+ |symbol |side|price |amount |timestamp | +-------+----+--------+---------+--------------------------+ |BTC-USD|buy |23128.72|6.4065E-4|2023-02-01 00:00:00.141334| |BTC-USD|buy |23128.33|3.7407E-4|2023-02-01 00:00:01.169344| |BTC-USD|buy |23128.33|8.1796E-4|2023-02-01 00:00:01.184992| +-------+----+--------+---------+--------------------------+ only showing top 3 rows Although sparktest.py speaks for itself, it is still worth mentioning that this application has a dependency on the JDBC driver located in postgresql-42.5.1.jar. It cannot run without this dependency; hence it has to be submitted to Spark together with the application. Optimizing Data Loading With Spark We have loaded data into Spark. Now we will look at how this was completed and some aspects to consider. The easiest way to peek under the hood is to check QuestDB's log, which should tell us how Spark interacted with the database. We will also make use of the Spark UI, which displays useful insights of the execution, including stages and tasks. Spark Connection to QuestDB: Spark Is Lazy QuestDB log shows that Spark connected three times to the database. For simplicity, I only show the relevant lines in the log: Shell 2023-03-21T21:12:24.031443Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:24.060520Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:12:24.072262Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Spark first queried the database when we created the DataFrame, but as it turns out, it was not too interested in the data itself. The query looked like this: SELECT * FROM trades WHERE 1=0 The only thing Spark wanted to know was the schema of the table in order to create an empty DataFrame. Spark evaluates expressions lazily and only does the bare minimum required at each step. After all, it is meant to analyze big data, so resources are incredibly precious for Spark. Especially memory: data is not cached by default. The second connection happened when Spark counted the rows of the DataFrame. It did not query the data this time, either. Interestingly, instead of pushing the aggregation down to the database by running SELECT count(*) FROM trades, it just queried a 1 for each record: SELECT 1 FROM trades. Spark adds the 1s together to get the actual count. Shell 2023-03-21T21:12:25.692098Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:12:25.693863Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT 1 FROM trades ] 2023-03-21T21:12:25.695284Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:12:25.749986Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:12:25.800765Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:12:25.881785Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Working with the data itself eventually forced Spark to get a taste of the table's content too. Filters are pushed down to the database by default. Shell 2023-03-21T21:12:26.132570Z I pg-server connected [ip=127.0.0.1, fd=28] 2023-03-21T21:12:26.134355Z I i.q.c.p.PGConnectionContext parse [fd=28, q=SELECT "symbol","side","price","amount","timestamp" FROM trades WHERE ("symbol" IS NOT NULL) AND ("side" IS NOT NULL) AND ("symbol" = 'BTC-USD') AND ("side" = 'buy') ] 2023-03-21T21:12:26.739124Z I pg-server disconnected [ip=127.0.0.1, fd=28, src=queue] We can see that Spark's interaction with the database is rather sophisticated and optimized to achieve good performance without wasting resources. The Spark DataFrame is the key component that takes care of the optimization, and it deserves some further analysis. What Is a Spark DataFrame? The name DataFrame sounds like a container to hold data, but we have seen earlier that this is not really true. So, what is a Spark DataFrame, then? One way to look at Spark SQL, with the risk of oversimplifying it, is that it is a query engine. df.filter(predicate) is really just another way of saying WHERE predicate. With this in mind, the DataFrame is pretty much a query, or actually more like a query plan. Most databases come with functionality to display query plans, and Spark has it too! Let's check the plan for the above DataFrame we just created: Python df.explain(extended=True) Shell == Parsed Logical Plan == Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Analyzed Logical Plan == symbol: string, side: string, price: double, amount: double, timestamp: timestamp Filter (side#1 = buy) +- Filter (symbol#0 = BTC-USD) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Optimized Logical Plan == Filter ((isnotnull(symbol#0) AND isnotnull(side#1)) AND ((symbol#0 = BTC-USD) AND (side#1 = buy))) +- Relation [symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation(trades) [numPartitions=1] == Physical Plan == *(1) Scan JDBCRelation(trades) [numPartitions=1] [symbol#0,side#1,price#2,amount#3,timestamp#4] PushedFilters: [*IsNotNull(symbol), *IsNotNull(side), *EqualTo(symbol,BTC-USD), *EqualTo(side,buy)], ReadSchema: struct<symbol:string,side:string,price:double,amount:double,timestamp:timestamp> If the DataFrame knows how to reproduce the data by remembering the execution plan, it does not need to store the actual data. This is precisely what we have seen earlier. Spark desperately tried not to load our data, but this can have disadvantages too. Caching Data Not caching the data radically reduces Spark's memory footprint, but there is a bit of jugglery here. Data does not have to be cached because the plan printed above can be executed again and again and again... Now imagine how a mere decently-sized Spark cluster could make our lonely QuestDB instance suffer martyrdom. With a massive table containing many partitions, Spark would generate a large number of tasks to be executed parallel across different nodes of the cluster. These tasks would query the table almost simultaneously, putting a heavy load on the database. So, if you find your colleagues cooking breakfast on your database servers, consider forcing Spark to cache some data to reduce the number of trips to the database. This can be done by calling df.cache(). In a large application, it might require a bit of thinking about what is worth caching and how to ensure that Spark executors have enough memory to store the data. In practice, you should consider caching smallish datasets used frequently throughout the application's life. Let's rerun our code with a tiny modification, adding .cache(): Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .load() \ .cache() # print the number of rows print(df.count()) # print the first 3 rows of the data df.show(3, truncate=False) This time Spark hit the database only twice. First, it came for the schema, the second time for the data: SELECT "symbol","side","price","amount","timestamp" FROM trades. Shell 2023-03-21T21:13:04.122390Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:04.147353Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT * FROM trades WHERE 1=0] 2023-03-21T21:13:04.159470Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] 2023-03-21T21:13:05.873960Z I pg-server connected [ip=127.0.0.1, fd=129] 2023-03-21T21:13:05.875951Z I i.q.c.p.PGConnectionContext parse [fd=129, q=SELECT "symbol","side","price","amount","timestamp" FROM trades ] 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] 2023-03-21T21:13:08.479209Z I pg-server disconnected [ip=127.0.0.1, fd=129, src=queue] Clearly, even a few carefully placed .cache() calls can improve the overall performance of an application, sometimes significantly. What else should we take into consideration when thinking about performance? Earlier, we mentioned that our Spark application consists of tasks, which are working on the different partitions of the data parallel. So, partitioned data mean parallelism, which results in better performance. Spark Data Partitioning Now we turn to the Spark UI. It tells us that the job was done in a single task: The truth is that we have already suspected this. The execution plan told us (numPartitions=1) and we did not see any parallelism in the QuestDB logs either. We can display more details about this partition with a bit of additional code: Python from pyspark.sql.functions import spark_partition_id, min, max, count df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-03 23:59:59.801778|1322570 | +-----------+--------------------------+--------------------------+------------------+ The UI helps us confirm that the data is loaded as a single partition. QuestDB stores this data in 3 partitions. We should try to fix this. Although it is not recommended, we can try to use DataFrame.repartition(). This call reshuffles data across the cluster while partitioning the data, so it should be our last resort. After running df.repartition(3, df.timestamp), we see 3 partitions, but not exactly the way we expected. The partitions overlap with one another: Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.698152|2023-02-03 23:59:59.801778|438550 | |1 |2023-02-01 00:00:00.078073|2023-02-03 23:59:57.188894|440362 | |2 |2023-02-01 00:00:00.828943|2023-02-03 23:59:59.309075|443658 | +-----------+--------------------------+--------------------------+------------------+ It seems that DataFrame.repartition() used hashes to distribute the rows across the 3 partitions. This would mean that all 3 tasks would require data from all 3 QuestDB partitions. Let's try this instead: df.repartitionByRange(3, "timestamp"): Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 21:22:35.656399|429389 | |1 |2023-02-01 21:22:35.665599|2023-02-02 21:45:02.613339|470372 | |2 |2023-02-02 21:45:02.641778|2023-02-03 23:59:59.801778|422809 | +-----------+--------------------------+--------------------------+------------------+ This looks better but still not ideal. That is because DaraFrame.repartitionByRange() samples the dataset and then estimates the borders of the partitions. What we really want is for the DataFrame partitions to match exactly the partitions we see in QuestDB. This way, the tasks running parallel in Spark do not cross their way in QuestDB, likely to result in better performance. Data source options are to the rescue! Let's try the following: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 'trades' table into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades") \ .option("partitionColumn", "timestamp") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.timestamp), max(df.timestamp), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+--------------------------+--------------------------+------------------+ |partitionId|min(timestamp) |max(timestamp) |count(partitionId)| +-----------+--------------------------+--------------------------+------------------+ |0 |2023-02-01 00:00:00.078073|2023-02-01 23:59:59.664083|487309 | |1 |2023-02-02 00:00:00.188002|2023-02-02 23:59:59.838473|457478 | |2 |2023-02-03 00:00:00.565319|2023-02-03 23:59:59.801778|377783 | +-----------+--------------------------+--------------------------+------------------+ After specifying partitionColumn, numPartitions, lowerBound, and upperBound, the situation is much better: the row counts in the partitions match what we have seen in the QuestDB logs earlier: rowCount=487309, rowCount=457478 and rowCount=377783. Looks like we did it! Shell 2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 [rowCount=487309, partitionNameTxn=2, transientRowCount=377783, partitionIndex=0, partitionCount=3] 2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 [rowCount=457478, partitionNameTxn=1, transientRowCount=377783, partitionIndex=1, partitionCount=3] 2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 [rowCount=377783, partitionNameTxn=0, transientRowCount=377783, partitionIndex=2, partitionCount=3] We can check Spark UI again; it also confirms that the job was completed in 3 separate tasks, each of them working on a single partition. Sometimes it might be tricky to know the minimum and maximum timestamps when creating the DataFrame. In the worst case, you could query the database for those values via an ordinary connection. We have managed to replicate our QuestDB partitions in Spark, but data does not always come from a single table. What if the data required is the result of a query? Can we load that, and is it possible to partition it? Options to Load Data: SQL Query vs. Table We can use the query option to load data from QuestDB with the help of a SQL query: Python # 1-minute aggregated trade data df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("query", "SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR") \ .load() Depending on the amount of data and the actual query, you might find that pushing the aggregations to QuestDB is faster than completing it in Spark. Spark definitely has an edge when the dataset is really large. Now let's try partitioning this DataFrame with the options used before with the option dbtable. Unfortunately, we will get an error message: Shell Options 'query' and 'partitionColumn' can not be specified together. However, we can trick Spark by just giving the query an alias name. This means we can go back to using the dbtable option again, which lets us specify partitioning. See the example below: Python from pyspark.sql import SparkSession from pyspark.sql.functions import spark_partition_id, min, max, count # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "min(price) as minimum, max(price) as maximum, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() df = df.withColumn("partitionId", spark_partition_id()) df.groupBy(df.partitionId) \ .agg(min(df.ts), max(df.ts), count(df.partitionId)) \ .show(truncate=False) Shell +-----------+-------------------+-------------------+------------------+ |partitionId|min(ts) |max(ts) |count(partitionId)| +-----------+-------------------+-------------------+------------------+ |0 |2023-02-01 00:00:00|2023-02-01 23:59:00|1440 | |1 |2023-02-02 00:00:00|2023-02-02 23:59:00|1440 | |2 |2023-02-03 00:00:00|2023-02-03 23:59:00|1440 | +-----------+-------------------+-------------------+------------------+ Looking good. Now it seems that we can load any data from QuestDB into Spark by passing a SQL query to the DataFrame. Do we, really? Our trades table is limited to three data types only. What about all the other types you can find in QuestDB? We expect that Spark will successfully map a double or a timestamp when queried from the database, but what about a geohash? It is not that obvious what is going to happen. As always, when unsure, we should test. Type Mappings I have another table in the database with a different schema. This table has a column for each type currently available in QuestDB. SQL CREATE TABLE all_types ( symbol SYMBOL, string STRING, char CHAR, long LONG, int INT, short SHORT, byte BYTE, double DOUBLE, float FLOAT, bool BOOLEAN, uuid UUID, --long128 LONG128, long256 LONG256, bin BINARY, g5c GEOHASH(5c), date DATE, timestamp TIMESTAMP ) timestamp (timestamp) PARTITION BY DAY; INSERT INTO all_types values('sym1', 'str1', 'a', 456, 345, 234, 123, 888.8, 11.1, true, '9f9b2131-d49f-4d1d-ab81-39815c50d341', --to_long128(10, 5), rnd_long256(), rnd_bin(10,20,2), rnd_geohash(35), to_date('2022-02-01', 'yyyy-MM-dd'), to_timestamp('2022-01-15T00:00:03.234', 'yyyy-MM-ddTHH:mm:ss.SSS')); long128 is not fully supported by QuestDB yet, so it is commented out. Let's try to load and print the data; we can also take a look at the schema of the DataFrame: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) Much to my surprise, Spark managed to create the DataFrame and mapped all types. Here is the schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DoubleType(), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', TimestampType(), True), StructField('timestamp', TimestampType(), True) ]) It looks pretty good, but you might wonder if it is a good idea to map long256 and geohash types to String. QuestDB does not provide arithmetics for these types, so it is not a big deal. Geohashes are basically 32-base numbers, represented and stored in their string format. The 256-bit long values are also treated as string literals. long256 is used to store cryptocurrency private keys. Now let's see the data: Shell +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double|float|bool|uuid | +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8 |11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+-------------------+-----------------------+ |g5c |date |timestamp | +-----+-------------------+-----------------------+ |q661k|2022-02-01 00:00:00|2022-01-15 00:00:03.234| +-----+-------------------+-----------------------+ It also looks good, but we could omit the 00:00:00 from the end of the date field. We can see that it is mapped to Timestamp and not Date. We could also try to map one of the numeric fields to Decimal. This can be useful if later we want to do computations that require high precision. We can use the customSchema option to customize the column types. Our modified code: Python from pyspark.sql import SparkSession # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # create dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "all_types") \ .option("customSchema", "date DATE, double DECIMAL(38, 10)") \ .load() # print the schema print(df.schema) # print the content of the dataframe df.show(truncate=False) The new schema: Shell StructType([ StructField('symbol', StringType(), True), StructField('string', StringType(), True), StructField('char', StringType(), True), StructField('long', LongType(), True), StructField('int', IntegerType(), True), StructField('short', ShortType(), True), StructField('byte', ShortType(), True), StructField('double', DecimalType(38,10), True), StructField('float', FloatType(), True), StructField('bool', BooleanType(), True), StructField('uuid', StringType(), True), # StructField('long128', StringType(), True), StructField('long256', StringType(), True), StructField('bin', BinaryType(), True), StructField('g5c', StringType(), True), StructField('date', DateType(), True), StructField('timestamp', TimestampType(), True) ]) And the data is displayed as: Shell +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |symbol|string|char|long|int|short|byte|double |float|bool|uuid | +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ |sym1 |str1 |a |456 |345|234 |123 |888.8000000000|11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341| +------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+ +------------------------------------------------------------------+----------------------------------------------------+ |long256 |bin | +------------------------------------------------------------------+----------------------------------------------------+ |0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]| +------------------------------------------------------------------+----------------------------------------------------+ +-----+----------+-----------------------+ |g5c |date |timestamp | +-----+----------+-----------------------+ |q661k|2022-02-01|2022-01-15 00:00:03.234| +-----+----------+-----------------------+ It seems that Spark can handle almost all database types. The only issue is long128, but this type is a work in progress currently in QuestDB. When completed, it will be mapped as String, just like long256. Writing Data Back Into the Database There is only one thing left: writing data back into QuestDB. In this example, first, we will load some data from the database and add two new features: 10-minute moving average standard deviation, also calculated over the last 10-minute window Then we will try to save the modified DataFrame back into QuestDB as a new table. We need to take care of some type mappings as Double columns are sent as FLOAT8 to QuestDB by default, so we end up with this code: Python from pyspark.sql import SparkSession from pyspark.sql.window import Window from pyspark.sql.functions import avg, stddev, when # create Spark session spark = SparkSession.builder.appName("questdb_test").getOrCreate() # load 1-minute aggregated trade data into the dataframe df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "(SELECT symbol, sum(amount) as volume, " "round((max(price)+min(price))/2, 2) as mid, " "timestamp as ts " "FROM trades WHERE symbol = 'BTC-USD' " "SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table") \ .option("partitionColumn", "ts") \ .option("numPartitions", "3") \ .option("lowerBound", "2023-02-01T00:00:00.000000Z") \ .option("upperBound", "2023-02-04T00:00:00.000000Z") \ .load() # add new features window_10 = Window.partitionBy(df.symbol).rowsBetween(-10, Window.currentRow) df = df.withColumn("ma10", avg(df.mid).over(window_10)) df = df.withColumn("std", stddev(df.mid).over(window_10)) df = df.withColumn("std", when(df.std.isNull(), 0.0).otherwise(df.std)) # save the data as 'trades_enriched' df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save() All works but…we soon realize that our new table, trades_enriched is not partitioned and does not have a designated timestamp, which is not ideal. Obviously Spark has no idea of QuestDB specifics. It would work better if we created the table upfront and Spark only saved the data into it. We drop the table and re-create it; this time, it is partitioned and has a designated timestamp: SQL DROP TABLE trades_enriched; CREATE TABLE trades_enriched ( volume DOUBLE, mid DOUBLE, ts TIMESTAMP, ma10 DOUBLE, std DOUBLE ) timestamp (ts) PARTITION BY DAY; The table is empty and waiting for the data. We rerun the code; all works, no complaints. The data is in the table, and it is partitioned. One aspect of writing data into the database is if we are allowed to create duplicates. What if I try to rerun the code again without dropping the table? Will Spark let me save the data this time? No, we get an error: Shell pyspark.sql.utils.AnalysisException: Table or view 'trades_enriched' already exists. SaveMode: ErrorIfExists. The last part of the error message looks interesting: SaveMode: ErrorIfExists. What is SaveMode? It turns out we can configure what should happen if the table already exists. Our options are: errorifexists: the default behavior is to return an error if the table already exists, Spark is playing safe here append : data will be appended to the existing rows already present in the table overwrite: the content of the table will be replaced entirely by the newly saved data ignore: if the table is not empty, our save operation gets ignored without any error We have already seen how errorifexists behaves, append and ignore seem to be simple enough just to work. However, overwrite is not straightforward. The content of the table must be cleared before the new data can be saved. Spark will delete and re-create the table by default, which means losing partitioning and the designated timestamp. In general, we do not want Spark to create tables for us. Luckily, with the truncate option we can tell Spark to use TRUNCATE to clear the table instead of deleting it: Python # save the data as 'trades_enriched', overwrite if already exists df.write.format("jdbc") \ .option("url", "jdbc:postgresql://localhost:8812/questdb") \ .option("driver", "org.postgresql.Driver") \ .option("user", "admin").option("password", "quest") \ .option("dbtable", "trades_enriched") \ .option("truncate", True) \ .option("createTableColumnTypes", "volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE") \ .save(mode="overwrite") The above works as expected. Conclusion Our ride might seem bumpy, but we finally have everything working. Our new motto should be "There is a config option for everything!". To summarize what we have found: We can use Spark's JDBC data source to integrate with QuestDB. It is recommended to use the dbtable option, even if we use a SQL query to load data. Always try to specify partitioning options (partitionColumn, numPartitions, lowerBound and upperBound) when loading data, partitions ideally should match with the QuestDB partitions. Sometimes it makes sense to cache some data in Spark to reduce the number of trips to the database. It can be beneficial to push work down into the database, depending on the task and how much data is involved. It makes sense to make use of QuestDB's time-series-specific features, such as SAMPLE BY, instead of trying to rewrite it in Spark. Type mappings can be customized via the customSchema option when loading data. When writing data into QuestDB always specify the desired saving mode. Generally works better if you create the table upfront and do not let Spark create it, because this way you can add partitioning and designated timestamp. If selected the overwrite saving mode, you should enable the truncate option too to make sure Spark does not delete the table; hence partitioning and the designated timestamp will not get lost. Type mappings can be customized via the createTableColumnTypes option when saving data. I mentioned only the config options which are most likely to be tweaked when working with QuestDB; the complete set of options can be found here: Spark data source options. What Could the Future Bring? Overall everything works, but it would be nice to have a much more seamless way of integration, where partitioning would be taken care of automagically. Some type mappings could use better defaults, too, when saving data into QuestDB. The overwrite saving mode could default to use TRUNCATE. More seamless integration is not impossible to achieve. If QuestDB provided its own JDBCDialect implementation for Spark, the above nuances could be handled. We should probably consider adding this. Finally, there is one more thing we did not mention yet, data locality. That is because, currently QuestDB cannot run as a cluster. However, we are actively working on a solution — check out The Inner Workings of Distributed Databases for more information. When the time comes, we should ensure that data locality is also considered. Ideally, each Spark node would work on tasks that require partitions loaded from the local (or closest) QuestDB instance. However, this is not something we should be concerned about at this moment... for now, just enjoy data crunching!
Data synchronization is one of the most important aspects of any product. Apache Kafka is one of the most popular choices when designing a system that expects near-real-time propagation of large volumes of data. Even though Kafka has simple yet powerful semantics, working with it requires insight into its architecture. This article summarizes the most important design aspects of Kafka as a broker and applications that act as data producers or consumers. About Kafka Apache Kafka originated on LinkedIn and was developed as a highly scalable distribution system for telemetry and usage data. Over time, Kafka evolved into a general-purpose streaming data backbone that combines high throughput with low data delivery latencies. Internally, Kafka is a distributed log. A (commit) log is an append-only data structure to whose end the producers append the data (log records), and subscribers read the log from the beginning to replay the records. This data structure is used, for example, in the database write-ahead log. Distributed log means that the actual data structure is not hosted on a single node but is distributed across many nodes to achieve both high availability and high performance. Internals and Terminology Before we jump into how Kafka is used by applications, let's quickly go through the basic terminology and architecture so we understand the guarantees that Kafka provides to its users. A single Kafka node is called a Broker. The broker receives messages from producers and distributes these to consumers. Producers send the messages into distributed logs, which are called topics (in traditional messaging, this corresponds to a queue). To scale up the performance of a single topic over the capacity of a single node, each topic may be split into multiple partitions. To achieve high availability and durability of the data stored, each partition has a leader (performing all read and write operations) and multiple followers. Partitions are assigned to brokers automatically, and the failover of a broker is also automatic and transparent to developers using Kafka. On the backend, the assignment of leader/replica roles is orchestrated using leader election in Apache ZooKeeper or in the newer versions of Kafka using the KRaft protocol. In the diagram, we can see a Kafka cluster, which consists of five brokers. In this scenario, two topics (A and B) were created. Topic A has two partitions, while topic B has only a single partition. The cluster was set up with replication factor 3 — this means there are always three copies of the data stored, allowing two nodes to fail without losing the data. The replication factor of 3 is a sane default since it guarantees tolerance of a node failure even during the maintenance of one other broker. You may ask why topic A was divided into two partitions; what is the benefit? First, please notice that leader of Partition 1 is on a different node than the leader of Partition 2. This means that if clients produce/consume data to/from this topic, they may use the disk throughput and performance of 2 nodes instead of 1. On the other hand, there is a cost to this decision: message ordering is guaranteed only within a single partition. Producers and Consumers Now that we have some understanding of how Kafka works internally, let's take a look at how the situation looks from the perspective of producers/consumers. Producer Let's start with the producer. As mentioned above, replication or assignment of topics/partitions is a concern of Kafka itself and is not visible to producers or consumers. So the producer only needs to know which topics it wishes to send data to and if these topics have multiple partitions. In case the topic is partitioned (entity-1), the producer may create as part of its code a "partitioner," which is a simple class that decides to which partition the given record belongs. So in Kafka, the partitioning is driven by the producer. In case the producer does not specify any partitioner (but the topic is partitioned), a round-robin strategy is used. Round-robin is completely fine for entities where the exact ordering is not important — there is no causal relation between the records. For example, if you have a topic with sensor measurements, these measurements may be sent by the sensors on a scheduled basis — hence there is no particular order of the records. And round-robin provides an easy way to balance the records among the individual partitions. Our example with sensors also shows another important detail: there may be multiple producers sending the records into one topic. In the diagram above, we see that we have many sensors (producers) creating two types of records: humidity (in green) and CO2 concentration (in red). Each of the records also contains information about the sensor itself (such as its (serial) number, in this example integer is used for the sake of simplicity). Because each of the sensors ever produced has the capability of measuring humidity, while only some of the sensors support CO2 measurements, the designers of the systems have decided to split the humidity records into two partitions using the serial number of the sensor. Notice that there is strict ordering within each of the humidity partitions (and within the CO2 partition), but there is no ordering of records between the partitions — in other words: B will be always processed before D and E. A will always be processed before C, but there is no ordering guarantee between B and A (or C). Consumer Kafka consumer is an application that reads the records from the topic. In Kafka, there is one more concept through the consumer group — a set of consumers that cooperate. When there are multiple consumers from the same group subscribed to the same topic, Kafka always distributes the partitions among the consumers in the same group in a way that each partition is read exactly once (there may be multiple partitions read by a single consumer, but one partition will not be read by multiple consumers). In case some of the consumers fail, Kafka will automatically reassign partitions to other consumers (please note that consumers do not need to subscribe to all topics). But in case of a failover or switchover, how does Kafka know where to continue? We have already said that a topic contains all the messages (even the messages that were already read). Does this mean that the consumer must read again the whole topic? The answer is that the consumer is able to continue where the previous one stopped. Kafka uses a concept called an offset, which is essentially a pointer to a message in the partition, which stores the position of the last processed message by any given consumer group. Offset While it may seem trivial, the concept of offsets and distributed logs is extremely powerful. It is possible to dynamically add new consumers, and these consumers (starting from offset=0) are able to catch up with the full history of data. While with traditional queues, the consumers would need to somehow fetch all the data from consumers (because messages are deleted once read in classic messaging). This data sync is more complex because either the producer produces the messages into the one queue used for increments (and affects all other consumers), or the consumer needs to use some other mechanism (such as REST or another dedicated queue), which creates data synchronization issues (as two independent unsynchronized mechanisms are used). Another huge benefit is that the consumer may any time decide to reset the offset and read from the beginning of the time. Why would one do that? Firstly there is a class of analytical applications (such as machine learning) that requires processing the whole dataset, and offset reset gives such a mechanism. Secondly, it may happen that there is a bug in the consumer, which corrupts the data. In this case, the consumer product team may fix the issue and reset the offset – effectively reprocessing the whole dataset and replacing corrupt data with the correct one. This mechanism is heavily used in Kappa-architecture. Retention and Compaction We have above stated that the commit log is append-only, but this does not imply that the log is immutable. In fact, this is true only for certain types of deployments, where it is necessary to hold the full history of all changes (for auditing purposes or to have real kappa architecture). This strategy is powerful but also has a price. Firstly performance: the consumer needs to go through huge volumes of data in order to get on top of the log. Secondly, if the log contains any sensitive information, it is hard to get rid of it (which makes this type of log unfriendly to regulations that require the data to be erased on request). But in many cases, the logs have some fixed retention — either size or time-based. In this case, the log contains only a window of messages (and any overflow is automatically erased). Using a log as a buffer makes the log size reasonable and also ensures that the data does not stay in the log forever (making it easier to adhere to compliance requirements). However, this also makes the log unusable for certain use cases — one of these use cases is when you want to have all the records available to newly subscribed consumers. The last type of log is the so-called compacted log. In a compacted log, each record has not only a value but also a key. Whenever a new record is appended to the topic, and there is already a record with the same key, Kafka will eventually compact the log and erase the original record. Be aware that this means for a certain time, there will be multiple records with the same key, and the up-to-date value is always in the most recently inserted record — this does not require any additional handling in case you go with at-least-once semantics (it is guaranteed that the message will be delivered, but in case of any uncertainty (for example due to network issues), the message may be delivered multiple times). You can picture the compacted log as a streaming form of a database that allows anyone to subscribe to the newest data. This image of Kafka is a very correct one because there is a duality between a stream and a table. Both these concepts are merely different views of the same thing — in SQL DB, we also use tables, but under the hood, there is a commit log. Similarly, any Kafka topic (compacted included) can be viewed as a table. In fact, the Kafka Streams library builds on this duality. There is even ksqlDB (Kafka SQL) that allows you to issue SQL statements over records in Kafka. In the topology above, we see that the inbound measurement topics (temperature, humidity, co2…) are normal topics with retention set to seven days. The retention allows the developers to time travel a week back in case they find a bug in their implementation. From these inbound topics, the data are read by two services (each in a separate consumer group). The measurements history service stores the telemetry into a time-series database (long-term storage), which may be used as a source for graphs and widgets in the UI of the system. The trends service aggregates the data (creates 24h windows of the measurements in the given room), so these can be used by downstream controllers and sends the results through a compacted topic. The topic is compacted because there is no need to keep any historical records (only the latest trend is valid). On the other hand, the customer may add a new device (and associated controller) at any time, so we want to ensure that the latest readings for the given room are always present. Patterns and Principles In the previous paragraphs, we presented basic concepts. In this section, we'll expand on those and discuss a few other Kafka patterns. Eventually Consistent Architecture In data synchronization architecture based on messaging, we want to ensure that whenever new data is produced in one product, it will be available to all relevant products in near-real-time. This means that if the user creates/modifies some entity in product A and navigates to product B, he/she should (ideally) see the up-to-date version of this entity. However, since the individual products use multiple independent databases, it is not practical to have a distributed transaction mechanism and have atomical consistency between these databases. Instead, we go with the eventual consistency. In this model, the data producer is responsible for publishing any record it creates/updates/deletes to Kafka, from which an interested consumer may retrieve the record and store it locally. This propagation between systems takes some time. Less than a second (expected) between the publishing of the record and the moment when the record is available to subscribers Also, the consumer may optimize writes to his database (e.g., batch writes). During this time period, some of the systems (the replicas) have slightly stale data. It may also happen that some of the replicas will not be able to catch up for some time (downtime, network partition). But structurally, it is guaranteed that all the systems will eventually converge to the same results and will hold a consistent dataset — hence the term "eventual consistency." Optimizing Writes to the Local Database As alluded to in the previous paragraph, consumers may want to optimize writes to their local database. For example, it is highly undesirable to commit on a per-record basis in relational databases because transaction commit is a relatively expensive operation. It may be much wiser to commit in batches (commit every 5000 records; at a maximum of 500ms intervals — whatever comes first). Kafka is well able to support this (because committing to an offset is in hands of the consumer). Another example is AWS Redshift, which is a data warehouse/OLAP database in which commits are very expensive. Also, in Redshift, every commit invalidates its query caches. And as a result, the cluster takes the hit of the commit twice — once to perform the commit itself and for the second time when all previously cached queries must be re-evaluated. For these reasons, you may want to commit to Redshift (and similar technologies) on a scheduled basis every X minutes to limit the blast radius of this action. The last example may be NoSQL databases that do not support transactions. It may be just fine to stream the data on a per-record basis (obviously, depending on the capabilities of the DB engine). There is one takeaway: different replicas may use a slightly different persistence strategy, even if they consume the same data. Always assume that there is a possibility that the other side does not have the data available yet. Referential Integrity Between Topics It is important to understand that since the Kafka-based data synchronization is eventually consistent, there is no implicit referential integrity or causal integrity between the individual topics (or partitions). When it comes to referential integrity, the consumers should be written in a way that they expect that they may receive, for example, measurements for a room that they have not received yet. Some of the consumers may overcome this situation either by not showing the data at all till all the dimensions are present (for example, you can't turn on ventilation when you do not know the room). For other systems, the missing reference is not really an issue: the average temperature in the house will be the same, regardless of the presence of room details. For these reasons, it may be impractical to impose any strict restrictions centrally. Stateful Processing Kafka consumers may require stateful processing — such as aggregation, window function, and deduplication. Also, the state itself may not be of a trivial size, or there may be a requirement that in case of a failure, the replica is able to continue almost instantly. In these cases, storing the results in the RAM of the consumer is not the best choice. Luckily, the Kafka Streams library has out-of-the-box support for RocksDB — a high-performance embedded key-value store. RocksDB is able to store the results both in RAM and on disk. Caching Strategy and Parallelism Closely related to stateful processing is a caching strategy. Kafka is, by its design, not well suited for the competing consumer's style of work because each partition is assigned to exactly one processor. If one wants to implement competing consumers, he needs to create significantly more partitions than there are consumers within the system to emulate the behavior. However, this is not the way parallelism should be handled in Kafka-based systems (in case you really need a job queue of unrelated jobs, you will be much better off with RabbitMQ, SQS, and ActiveMQ…). Kafka is a stream processing system, and it is expected that the records in one partition somehow relate to each other and should be processed together. The individual partitions act as data shards, and since Kafka guarantees that each of these partitions will be assigned to one and exactly one consumer, the consumer can be sure that there is no other competing processor — so it can cache the results as it sees fit in its local cache and does not need to implement any distributed caching (such as Redis). In case the processor fails/crashes, Kafka will just reassign the partition to another consumer, which will populate its local caches and continue. This design of stream processing is significantly easier than competing consumers. There is one consideration, though. That is the partitioning strategy because that is defined by default by the producer, while different consumers may have multiple mutually incompatible needs. For this reason, it is common in Kafka's world to re-partition the topic. In our scenario, it would work the following way: In the diagram, we can see that Trends produce trends in its topic. This topic is round-robin partitioned and compacted. ProductX, which focuses on large industrial customers, needs to partition the data in some other way, for example, by customerId. In this case, ProductX may write a simple application that re-partitions the data (re-partitioning is often managed under the hood by the Kafka Streams library). In other words, it reads the data from the source topic and writes it into another topic, managed by ProductX, which partitions the data differently (per business unit in this case). With this partitioning, ProductX is able to shard the non-overlapping business units to dedicated processing nodes, massively increasing the processing parallelism. The internal ProductX topic may have just short retention (such as 24h) because it does not hold the authoritative copy of data, and the data can be easily replayed from the original topic, if necessary. In Kafka Streams, you may want to join several entities in order to combine the data (this is a common use case). Beware that in case you have multiple consumers, you need to have the inbound topics partitioned in the exact same way (same partitioner (join key based), same number of partitions). Only this way, you have guaranteed that the entities with matching join keys will be received by the same consumer (processor). Summary In this article, we have discussed how the overall architecture of Kafka and how this low-level architecture allows the broker to easily scale horizontally (thanks to partitioning) and ensure high availability and durability (thanks to leader/replica design and master election). We also went through the basics of designing Kafka-based topologies, explained eventual consistency and how that affects the guarantees given to our applications, and learned how to use Kafka's different types of logs and their retention. While Kafka may seem overwhelming at first, it is important to realize that internally it is based on the plain old good distributed log. This relatively simple internal structure is what gives Kafka its straightforward semantics, high throughput, and low data propagation latencies. Qualities are crucial for building any data pipeline.
Continuing our Snowflake blog series, after learning about setting up a Snowflake account using System-defined roles, we will explore the best practices for loading data from a file into Snowflake. Let’s begin! Snowflake supports file-based data ingestion through internal and external stages. However, there are various factors to consider when performing data ingestion, including the frequency of data arrival, file sizes, and the data loading techniques used such as copy command, external tables, and Snowpipe, among others. Additionally, the file format used, such as CSV, Parquet, JSON, etc., also plays a critical role in choosing the right approach. Making the wrong choice can result in increased costs and slower performance. This blog provides insights into these approaches to help you select the best one while loading data into Snowflake. Faster Loading of Parquet Files From S3 If you want to load Parquet files from S3 faster into Snowflake, you should not use the COPY command, which is suitable for CSV format files placed in external stages. Instead, it is recommended to use external tables on top of the Parquet file and enable the “vector scan performance flag” scan property. However, to enable this property, you need to contact Snowflake support and have it enabled at the account level. Parallel Operations To improve the efficiency of loading data from stages into Snowflake, it is recommended to create files in the range of 100-250MB with compression. By default, each node can handle 8 threads in parallel. Therefore, if you have 32 files, a Medium warehouse with 4 nodes can process all of them simultaneously, running 32 threads in parallel. It’s important to keep in mind that the performance of parallel operations in Snowflake can also be affected by the complexity of the operations being performed. Additionally, Snowflake may not always run the threads as defined in parallel due to various factors such as resource availability and load balancing. Therefore, it’s important to monitor and adjust the parallelism accordingly to ensure optimal performance. It is important to ensure that the warehouse used for data loading runs for at least a minute to fully utilize the cost for active time. Purging Files From Stage To optimize performance, it is recommended to remove files from the stage after successful loading using the COPY command with the PURGE=True option. This ensures that the staged files are deleted once loaded, which not only improves performance but also eliminates the need for COPY commands to scan the entire bucket path to check for files to load. Loading Large Files in Snowflake Suppose you need to load a single large file in gigabytes, which may or may not contain bad data in some of its rows. In such cases, it is advisable to load the data by ignoring the errors. Failing to do so can result in the wastage of credits. For example, if the data is being processed using a warehouse and an error row is identified after an hour, the entire operation will be aborted, leading to credit wastage. Using “ON_ERROR” as “CONTINUE” will load the good data and ignore the bad rows. However, it is always recommended to load large files by splitting them into smaller chunks so that parallel processing can be utilized using a warehouse. If that’s not possible and you still want to load a large file, it’s recommended to check if it’s okay to continue loading by ignoring the bad data. Best Practice While Loading JSON Data When loading JSON data into Snowflake, it’s important to note that the output from some applications can be a single large array consisting of multiple rows combined together. However, due to the limitations of the VARIANT datatype (which has a maximum size of 16 MB), such a huge JSON output cannot be loaded in its entirety. Moreover, it’s not appropriate to load a single huge array as a single row. Instead, the array should be split into multiple rows. To achieve this, use the STRIP_OUTER_ARRAY=TRUE option in the FILE_FORMAT parameter of the COPY command. This option excludes the outer array and identifies individual rows, allowing for efficient loading of JSON data into Snowflake. Snowpipe File Loading Best Strategies Loading data using Snowpipe can result in charges for the time taken to load the file placed on the stage and the overhead of maintaining the queue for files waiting to be loaded. This means that if a large number of files with different sizes are continuously loaded into the stage location, it could lead to slower loading of the data and increased expenses. To ensure effective loading, it is best to ensure that the file sizes are within the limits of 100-250MB and that there is no sudden surge of files in the stage location, which could increase the queue. You can optimize the loading process by placing the files in the stage at a certain frequency, which is close to the time it takes to load a single file using Snowpipe. This will help ensure efficient utilization of resources and reduce costs. Make Use of the Internal Stage Wherever Applicable To optimize data loading in Snowflake, it is recommended to use the internal stage instead of external stages such as S3, as it results in faster loading to the Snowflake table. It is also important to compare the costs of storing data in the stage location of Snowflake with that of the object storage of cloud providers. To reduce costs, it is advisable to purge the data from the stage when it is no longer needed. Thank you for reading this article. In the upcoming installment of this series, we will explore the best practices for optimizing the use of Snowflake warehouses and tables.
In this tutorial, we will explore the exciting world of MicroStream, a powerful open-source platform that enables ultrafast data processing and storage. Specifically, we will explore how MicroStream can leverage the new Jakarta Data and NoSQL specifications, which offer cutting-edge solutions for handling data in modern applications. With MicroStream, you can use these advanced features and supercharge your data processing capabilities while enjoying a simple and intuitive development experience. So whether you're a seasoned developer looking to expand your skill set or just starting in the field, this tutorial will provide you with a comprehensive guide to using MicroStream to explore the latest in data and NoSQL technology. MicroStream is a high-performance, in-memory, NoSQL database platform for ultrafast data processing and storage. One of the critical benefits of MicroStream is its ability to achieve lightning-fast data access times, thanks to its unique architecture that eliminates the need for disk-based storage and minimizes overhead. With MicroStream, you can easily store and retrieve large amounts of data in real-time, making it an ideal choice for applications that require rapid data processing and analysis, such as financial trading systems, gaming platforms, and real-time analytics engines. MicroStream provides a simple and intuitive programming model, making it easy to integrate into your existing applications and workflows. One of the main differences between MicroStream and other databases is its focus on in-memory storage. While traditional databases rely on disk-based storage, which can lead to slower performance due to disk access times, MicroStream keeps all data in memory, allowing for much faster access. Additionally, MicroStream's unique architecture allows it to achieve excellent compression rates, further reducing the memory footprint and making it possible to store even more data in a given amount of memory. Finally, MicroStream is designed with simplicity and ease of use. It provides a developer-friendly interface and minimal dependencies, making integrating into your existing development workflow easy. MicroStream Eliminates Mismatch Impedance Object-relational impedance mismatch refers to the challenge of mapping data between object-oriented programming languages and relational databases. Object-oriented programming languages like Java or Python represent data using objects and classes, whereas relational databases store data in tables, rows, and columns. This fundamental difference in data representation can lead to challenges in designing and implementing database systems that work well with object-oriented languages. One of the trade-offs of the object-relational impedance mismatch is that it can be challenging to maintain consistency between the object-oriented and relational databases. For example, suppose an object in an object-oriented system has attributes related to one another. In that case, mapping those relationships to the relational database schema may be challenging. Additionally, object-oriented systems often support inheritance, which can be tough to represent in a relational database schema. While various techniques and patterns can be used to address the object-relational impedance mismatch, such as object-relational mapping (ORM) tools or database design patterns, these solutions often come with their trade-offs. They may introduce additional complexity to the system. Ultimately, achieving a balance between object-oriented programming and relational database design requires careful consideration of the specific needs and constraints of the application at hand. MicroStream can help reduce the object-relational impedance mismatch by eliminating the need for a conversion layer between object-oriented programming languages and relational databases. Since MicroStream is an in-memory, NoSQL database platform that stores data as objects, it provides a natural fit for object-oriented programming languages, eliminating the need to map between object-oriented data structures and relational database tables. With MicroStream, developers can work directly with objects in their code without worrying about the complexities of mapping data to a relational database schema. It can result in increased productivity and improved performance, as there is no need for an additional conversion layer that can introduce overhead and complexity. Moreover, MicroStream's in-memory storage model ensures fast and efficient data access without expensive disk I/O operations. Data can be stored and retrieved quickly and efficiently, allowing for rapid processing and analysis of large amounts of data. Overall, by eliminating the object-relational impedance mismatch and providing a simple, efficient, and performant way to store and access data, MicroStream can help developers focus on building great applications rather than worrying about database architecture and design. MicroStream could guarantee a better performance by reducing the conversion to/from objects. The next step on your journey, let's create a MicroProfile application. MicroStream Faces Jakarta Specifications Now that I have explained about MicroStream, let's create our microservices application using Eclipse MicroProfile. The first step is going to the Eclipse MicroProfile Starter, where you can define configurations to your initial scope. Your application will be a simple library service using Open Liberty running with Java 17 and MicroStream. With the project downloaded, we must add the dependency integration between MicroProfile and MicroStream. This project dependency will change later internally to MicroStream, so this is a temporary house of this integration: XML <dependency> <groupId>expert.os.integration</groupId> <artifactId>microstream-jakarta-data</artifactId> <version>${microstream.data.version}</version> </dependency> The beauty of this integration is that it works with any vendors that work with MicroProfile 5 or higher. Currently, we're using Open Liberty. This integration enables both Jakarta persistence specifications: NoSQL and Data. Jakarta NoSQL and Jakarta Data are two related specifications developed by the Jakarta EE Working Group, aimed at providing standard APIs for working with NoSQL databases and managing data in Java-based applications. With the project defined, let's create a Book entity. The code below shows the annotation. We currently use the Jakarta NoSQL annotations. Java @Entity public class Book { @Id private String isbn; @Column("title") private String title; @Column("year") private int year; @JsonbCreator public Book(@JsonbProperty("isbn") String isbn, @JsonbProperty("title") String title, @JsonbProperty("year") int year) { this.isbn = isbn; this.title = title; this.year = year; } } The next step is the Jakarta Data part, where you can define a single interface with several capabilities with this database. Java @Repository public interface Library extends CrudRepository<Book, String> { } The last step is the resource, where we'll have service available. Java @Path("/library") @ApplicationScoped @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public class LibraryResource { private final Library library; @Inject public LibraryResource(Library library) { this.library = library; } @GET public List<Book> allBooks() { return this.library.findAll().collect(Collectors.toUnmodifiableList()); } @GET @Path("{id}") public Book findById(@PathParam("id") String id) { return this.library.findById(id) .orElseThrow(() -> new WebApplicationException(Response.Status.NOT_FOUND)); } @PUT public Book save(Book book) { return this.library.save(book); } @Path("{id}") public void deleteBy(@PathParam("id") String id) { this.library.deleteById(id); } } Conclusion Jakarta NoSQL and Jakarta Data are critical specifications that provide a standard set of APIs and tools for managing data in Java-based applications. Jakarta NoSQL enables developers to interact with various NoSQL databases using a familiar interface, while Jakarta Data provides APIs for working with data in multiple formats. These specifications help reduce the complexity and costs of application development and maintenance, enabling developers to achieve greater interoperability and portability across different NoSQL databases and data formats. Furthermore, MicroStream provides a high-performance, in-memory NoSQL database platform that eliminates the need for a conversion layer between object-oriented programming languages and relational databases, reducing the object-relational impedance mismatch and increasing productivity and performance. By combining the power of MicroStream with the standard APIs provided by Jakarta NoSQL and Jakarta Data, developers can create robust and scalable applications that can easily handle large amounts of data. The MicroStream, Jakarta NoSQL, and Jakarta Data combination offer robust tools and specifications for managing data in modern Java-based applications. These technologies help streamline the development process and enable developers to focus on building great applications that meet the needs of their users. Find the source code on GitHub.
In the previous article, we discussed the emergence of Date Lakehouses as the next-generation data management solution designed to address the limitations of traditional data warehouses and Data Lakes. Data Lakehouses combines the strengths of both approaches, providing a unified platform for storing, processing and analyzing diverse data types. This innovative approach offers flexibility, scalability, and advanced analytics capabilities that are essential for businesses to remain competitive in today's data-driven landscape. In this article, we will delve deeper into the architecture and components of Data Lakehouses, exploring the interconnected technologies that power this groundbreaking solution. The Pillars of Data Lakehouse Architecture A Data Lakehouse is a comprehensive data management solution that combines the best aspects of data warehouses and Data Lakes, offering a unified platform for storing, processing, and analyzing diverse data types. The Data Lakehouse architecture is built upon a system of interconnected components that work together seamlessly to provide a robust and flexible data management solution. In this section, we discuss the fundamental components of the Data Lakehouse architecture and how they come together to create an effective and convenient solution for the end user. At the core of the Data Lakehouse lies unified data storage. This element is designed to handle various data types and formats, including structured, semi-structured, and unstructured data. The storage layer's flexibility is enabled through storage formats such as Apache Parquet, ORC, and Delta Lake, which are compatible with distributed computing frameworks and cloud-based object storage services. By unifying data storage, Data Lakehouses allow organizations to easily ingest and analyze diverse data sources without extensive data transformation or schema modifications. Another essential aspect of the Data Lakehouse architecture is data integration and transformation. Data Lakehouses excel at handling data ingestion and transformation from various sources by incorporating built-in connectors and support for a wide array of data integration tools, such as Apache Nifi, Kafka, or Flink. These technologies enable organizations to collect, transform, and enrich data from disparate sources, including streaming data, providing real-time insights and decision-making capabilities. By offering seamless data integration, Data Lakehouses help reduces the complexity and cost associated with traditional data integration processes. Metadata management is a critical component of a Data Lakehouse, facilitating data discovery, understanding, and governance. Data cataloging tools like Apache Hive, Apache Atlas, or AWS Glue allow organizations to create a centralized metadata repository about their data assets. A comprehensive view of data lineage, schema, relationships, and usage patterns provided by metadata management tools enhances data accessibility, ensures data quality, and enables better compliance with data governance policies. Data processing and analytics capabilities are also integral to the Data Lakehouse architecture. Unified query engines like Apache Spark, Presto, or Dremio provide a single interface for querying data using SQL or other query languages, integrating batch and real-time processing for both historical and live data. Moreover, Data Lakehouses often support advanced analytics and machine learning capabilities, making it easier for organizations to derive valuable insights from their data and build data-driven applications. Finally, data governance and security are crucial in any data-driven organization. Data Lakehouses address these concerns by providing robust data quality management features like data validation, data lineage tracking, and schema enforcement. Additionally, Data Lakehouses support role-based access control, which enables organizations to define granular access permissions to different data assets, ensuring that sensitive information remains secure and compliant with regulatory requirements. Optimizing Storage Formats for Data Lakehouses In a Data Lakehouse architecture, the storage layer is crucial for delivering high performance, efficiency, and scalability while handling diverse data types. This section will focus on the storage formats and technologies used in Data Lakehouses and their significance in optimizing storage for better performance and cost-effectiveness. Columnar storage formats such as Apache Parquet and ORC are key components of Data Lakehouses. By storing data column-wise, these formats offer improved query performance, enhanced data compression, and support for complex data types. This enables Data Lakehouses to handle diverse data types efficiently without requiring extensive data transformation. Several storage solutions have been developed to cater to the unique requirements of Data Lakehouses. Delta Lake, Apache Hudi, and Apache Iceberg are three notable examples. Each of these technologies has its own set of advantages and use cases, making them essential components of modern Data Lakehouse architectures. Delta Lake is a storage layer project explicitly designed for Data Lakehouses. Built on top of Apache Spark, it integrates seamlessly with columnar storage formats like Parquet. Delta Lake provides ACID transaction support, schema enforcement and evolution, and time travel features, which enhance reliability and consistency in data storage. Apache Hudi is another storage solution that brings real-time data processing capabilities to Data Lakehouses. Hudi offers features such as incremental data processing, upsert support, and point-in-time querying, which help organizations manage large-scale datasets and handle real-time data efficiently. Apache Iceberg is a table format for large, slow-moving datasets in Data Lakehouses. Iceberg focuses on providing better performance, atomic commits, and schema evolution capabilities. It achieves this through a novel table layout that uses metadata more effectively, allowing for faster queries and improved data management. The intricacies of Delta Lake, Apache Hudi, and Apache Iceberg, as well as their unique advantages, are fascinating topics on their own. In one of our upcoming articles, we will delve deeper into these technologies, providing a comprehensive understanding of their role in Data Lakehouse architecture. Optimizing storage formats for Data Lakehouses involves leveraging columnar formats and adopting storage solutions like Delta Lake, Apache Hudi, and Apache Iceberg. These technologies work together to create an efficient and high-performance storage layer that can handle diverse data types and accommodate the growing data needs of modern organizations. Embracing Scalable and Distributed Processing in Data Lakehouses Data Lakehouse architecture is designed to address modern organizations' growing data processing needs. By leveraging distributed processing frameworks and techniques, Data Lakehouses can ensure optimal performance, scalability, and cost-effectiveness. Apache Spark, a powerful open-source distributed computing framework, is a foundational technology in Data Lakehouses. Spark efficiently processes large volumes of data and offers built-in support for advanced analytics and machine learning workloads. By supporting various programming languages, Spark serves as a versatile choice for organizations implementing distributed processing. Distributed processing frameworks like Spark enable parallel execution of tasks, which is essential for handling massive datasets and complex analytics workloads. Data partitioning strategies divide data into logical partitions, optimizing query performance and reducing the amount of data read during processing. Resource management and scheduling are crucial for distributed processing in Data Lakehouses. Tools like Apache Mesos, Kubernetes, and Hadoop YARN orchestrate and manage resources across a distributed processing environment, ensuring tasks are executed efficiently, and resources are allocated optimally. In-memory processing techniques significantly improve the performance of analytics and machine learning tasks by caching data in memory instead of reading it from disk. This reduces latency and results in faster query execution and better overall performance. Data Lakehouses embrace scalable and distributed processing technologies like Apache Spark, partitioning strategies, resource management tools, and in-memory processing techniques. These components work together to ensure Data Lakehouses can handle the ever-growing data processing demands of modern organizations. Harnessing Advanced Analytics and Machine Learning in Data Lakehouses Data Lakehouse architectures facilitate advanced analytics and machine learning capabilities, enabling organizations to derive deeper insights and drive data-driven decision-making. This section discusses the various components and techniques employed by Data Lakehouses to support these essential capabilities. First, the seamless integration of diverse data types in Data Lakehouses allows analysts and data scientists to perform complex analytics on a wide range of structured and unstructured data. This integration empowers organizations to uncover hidden patterns and trends that would otherwise be difficult to discern using traditional data management systems. Second, the use of distributed processing frameworks such as Apache Spark, which is equipped with built-in libraries for machine learning and graph processing, enables Data Lakehouses to support advanced analytics workloads. By leveraging these powerful tools, Data Lakehouses allows data scientists and analysts to build and deploy machine learning models and perform sophisticated analyses on large datasets. Additionally, Data Lakehouses can be integrated with various specialized analytics tools and platforms. For example, integrating Jupyter Notebooks and other interactive environments provides a convenient way for data scientists and analysts to explore data, develop models, and share their findings with other stakeholders. To further enhance the capabilities of Data Lakehouses, machine learning platforms like TensorFlow, PyTorch, and H2O.ai can be integrated to support the development and deployment of custom machine learning models. These platforms provide advanced functionality and flexibility, enabling organizations to tailor their analytics and machine-learning efforts to their specific needs. Lastly, real-time analytics and stream processing play an important role in Data Lakehouses. Technologies like Apache Kafka and Apache Flink enable organizations to ingest and process real-time data streams, allowing them to respond more quickly to market changes, customer needs, and other emerging trends. Ensuring Robust Data Governance and Security in Data Lakehouses Data Lakehouses prioritize data governance and security, addressing the concerns of organizations regarding data privacy, regulatory compliance, and data quality. This section delves into the various components and techniques that facilitate robust data governance and security in Data Lakehouses. Data cataloging and metadata management tools play a crucial role in establishing effective data governance within a Data Lakehouse. Tools such as Apache Atlas, AWS Glue, and Apache Hive provide centralized repositories for metadata, enabling organizations to track data lineage, discover data assets, and enforce data governance policies. Fine-grained access control is essential for maintaining data privacy and security in Data Lakehouses. Role-based access control (RBAC) and attribute-based access control (ABAC) mechanisms allow organizations to define and enforce user access permissions, ensuring that sensitive data remains secure and available only to authorized users. Data encryption is another key component of Data Lakehouse security. By encrypting data both at rest and in transit, Data Lakehouses ensure that sensitive information remains protected against unauthorized access and potential breaches. Integration with key management systems like AWS Key Management Service (KMS) or Azure Key Vault further enhances security by providing centralized management of encryption keys. Data Lakehouses also incorporate data quality and validation mechanisms to maintain the integrity and reliability of the data. Data validation tools like Great Expectations, data profiling techniques, and automated data quality checks help identify and address data inconsistencies, inaccuracies, and other issues that may impact the overall trustworthiness of the data. Auditing and monitoring are essential for ensuring compliance with data protection regulations and maintaining visibility into Data Lakehouse operations. Data Lakehouses can be integrated with logging and monitoring solutions like Elasticsearch, Logstash, Kibana (ELK Stack), or AWS CloudTrail, providing organizations with a comprehensive view of their data management activities and facilitating effective incident response. By prioritizing data privacy, regulatory compliance, and data quality, Data Lakehouses enables organizations to confidently manage their data assets and drive data-driven decision-making in a secure and compliant manner. Embracing the Data Lakehouse Revolution The Data Lakehouse architecture is a game-changing approach to data management, offering organizations the scalability, flexibility, and advanced analytics capabilities necessary to thrive in the era of big data. By combining the strengths of traditional data warehouses and Data Lakes, Data Lakehouses empowers businesses to harness the full potential of their data, driving innovation and informed decision-making. In this article, we have explored the key components and technologies that underpin the Data Lakehouse architecture, from data ingestion and storage to processing, analytics, and data governance. By understanding the various elements of a Data Lakehouse and how they work together, organizations can better appreciate the value that this innovative approach brings to their data management and analytics initiatives. As we continue our series on Data Lakehouses, we will delve deeper into various aspects of this revolutionary data management solution. In upcoming articles, we will cover topics such as the comparison of Delta Lake, Apache Hudi, and Apache Iceberg – three storage solutions that are integral to Data Lakehouse implementations – as well as best practices for Data Lakehouse design, implementation, and operation. Additionally, we will discuss the technologies and tools that underpin Data Lakehouse architecture, examine real-world use cases that showcase the transformative power of Data Lakehouses, and explore the intricacies and potential of this groundbreaking approach. Stay tuned for more insights and discoveries as we navigate the exciting journey of Data Lakehouse architectures together!
Oren Eini
Wizard,
Hibernating Rhinos @ayende
Kai Wähner
Technology Evangelist,
Confluent
Gilad David Maayan
CEO,
Agile SEO
Grant Fritchey
Product Advocate,
Red Gate Software