Is your software supply chain secure? Calling all security savants to share your experiences, tips, and insights with our dev community!
Data quality isn't just a technical issue: It impacts an organization's compliance, operational efficiency, and customer satisfaction.
Data is at the core of software development. Think of it as information stored in anything from text documents and images to entire software programs, and these bits of information need to be processed, read, analyzed, stored, and transported throughout systems. In this Zone, you'll find resources covering the tools and strategies you need to handle data properly.
Securing Parquet Files: Vulnerabilities, Mitigations, and Validation
A Deep Dive into Apache Doris Indexes
Java has long been the backbone of enterprise applications, but its role in data engineering is growing rapidly. Today, modern data platforms like Snowflake are giving Java developers first-class support to write powerful, flexible, and scalable data logic directly inside the database environment. This guide demonstrates how Java developers can leverage familiar tools — like classes, streams, and DataFrames — to build user-defined functions (UDFs) and stored procedures for real-time and batch data processing. By writing this logic in Java, you can encapsulate business rules, perform asynchronous operations, interact with structured or unstructured data, and maintain robust, reusable codebases within your data workflows. We’ll explore practical patterns for using Java to read dynamic files, execute logic across large datasets, and orchestrate non-blocking operations, all using standard Java concepts, enhanced by Snowflake’s integration where needed. Whether you're building a custom transformation layer or ingesting external files, these examples are grounded in Java fundamentals with modern data-driven use cases. Stored Procedures for Java Developers Stored procedures allow Java developers to embed business logic directly into the data layer, making it possible to automate complex workflows, orchestrate data transformations, and run administrative tasks all using familiar Java syntax and structure. With support for in-line or pre-compiled Java classes, developers can define handler methods that accept parameters, return single values or tabular results, and interact with data via platform-specific APIs like Snowpark. These procedures are ideal for scenarios that require conditional logic, looping, exception handling, and integration with external resources or files. Java stored procedures also support advanced features such as role-based execution (caller’s vs. owner’s rights), asynchronous operations, and logging for monitoring and debugging — making them a powerful tool for scalable and maintainable data applications in cloud environments like Snowflake. SQL CREATE OR REPLACE PROCEDURE file_reader_java_proc_snowflakefile(input VARCHAR) RETURNS VARCHAR LANGUAGE JAVA RUNTIME_VERSION = 11 HANDLER = 'FileReader.execute' PACKAGES=('com.snowflake:snowpark:latest') AS $$ import java.io.InputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import com.snowflake.snowpark_java.types.SnowflakeFile; import com.snowflake.snowpark_java.Session; class FileReader { public String execute(Session session, String fileName) throws IOException { InputStream input = SnowflakeFile.newInstance(fileName).getInputStream(); return new String(input.readAllBytes(), StandardCharsets.UTF_8); } } $$; CALL file_reader_java_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json')); Asynchronous Processing With Java Stored Procedures Asynchronous processing is a critical pattern for building responsive, non-blocking systems, and now, Java developers can bring that same approach to data workflows. Using JDBC within a stored procedure, you can execute long-running tasks asynchronously via methods like executeAsyncQuery(). For instance, the getResultJDBC() procedure uses SYSTEM$WAIT(10) to simulate a delay, showing how to trigger a non-blocking operation inside a data platform. This pattern is especially useful when orchestrating workflows that involve waiting on external services or time-consuming computations. By offloading these tasks asynchronously, you avoid tying up compute resources and allow other operations to continue in parallel, making your data processes more efficient and scalable — all with standard Java patterns and APIs. SQL CREATE OR REPLACE PROCEDURE getResultJDBC() RETURNS VARCHAR LANGUAGE JAVA RUNTIME_VERSION = 11 PACKAGES = ('com.snowflake:snowpark:latest') HANDLER = 'TestJavaSP.asyncBasic' AS $$ import java.sql.*; import net.snowflake.client.jdbc.*; class TestJavaSP { public String asyncBasic(com.snowflake.snowpark.Session session) throws Exception { Connection connection = session.jdbcConnection(); SnowflakeStatement stmt = (SnowflakeStatement)connection.createStatement(); ResultSet resultSet = stmt.executeAsyncQuery("CALL SYSTEM$WAIT(10)"); resultSet.next(); return resultSet.getString(1); } } $$; User-Defined Functions (UDFs) With Java User-defined functions (UDFs) let Java developers bring custom logic directly into SQL queries, making them a powerful tool for extending built-in functionality with familiar Java code. UDFs are lightweight, reusable functions that always return a value — ideal for tasks like string manipulation, calculations, data formatting, or even simple validations. Unlike stored procedures, which are invoked as standalone SQL commands, UDFs are embedded directly into queries, allowing you to apply logic row by row across large datasets. For example, let’s say you’re working with a column that contains arrays of strings, and you want to concatenate those values into a single sentence. With a Java UDF, you can write a simple static method using String.join, package it into a JAR, and then call it in your SQL as if it were a native function. For example, imagine processing product tags, names, or user inputs stored as arrays. This approach helps standardize and flatten those values in line with your analytics workflows. The best part? You get to maintain your logic in clean, testable Java code while keeping execution inside the data platform for performance and scalability. SQL CREATE OR REPLACE TABLE string_array_table(id INTEGER, a ARRAY); INSERT INTO string_array_table (id, a) SELECT 1, ARRAY_CONSTRUCT('Hello'); INSERT INTO string_array_table (id, a) SELECT 2, ARRAY_CONSTRUCT('Hello', 'Jay'); INSERT INTO string_array_table (id, a) SELECT 3, ARRAY_CONSTRUCT('Hello', 'Jay', 'Smith'); CREATE OR REPLACE FUNCTION concat_varchar_2(a ARRAY) RETURNS VARCHAR LANGUAGE JAVA HANDLER = 'TestFunc_2.concatVarchar2' TARGET_PATH = '@~/TestFunc_2.jar' AS $$ class TestFunc_2 { public static String concatVarchar2(String[] strings) { return String.join(" ", strings); } } $$; SELECT concat_varchar_2(a) FROM string_array_table ORDER BY id; Understanding Parallel Execution of Java UDFs When deploying Java user-defined functions (UDFs) in a distributed environment like Snowflake, it’s essential to understand how they execute in parallel, both across JVMs and within each JVM. Across JVMs: Java UDFs run across multiple workers in the underlying compute infrastructure, each of which may launch its own JVM. These JVMs operate independently, so there's no shared memory or state between them. This means static variables, caches, or singletons won’t persist across workers.Within a JVM: A single JVM can invoke the same UDF method concurrently on multiple threads. As a result, your handler method must be thread-safe. Avoid shared mutable state or use synchronization when necessary. If your UDF is marked IMMUTABLE, Snowflake optimizes execution by reusing results for repeated calls with the same input, like memoization. This is particularly useful for deterministic functions, and it’s a good practice to flag them as immutable when appropriate to boost performance. SQL /* Create a Jar file with the following Class class MyClass { private double x; // Constructor public MyClass() { x = Math.random(); } // Handler public double myHandler() { return x; } } */ CREATE FUNCTION my_java_udf_1() RETURNS DOUBLE LANGUAGE JAVA IMPORTS = ('@sales_data_stage/HelloRandom.jar') HANDLER = 'MyClass.myHandler'; CREATE FUNCTION my_java_udf_2() RETURNS DOUBLE LANGUAGE JAVA IMPORTS = ('@sales_data_stage/HelloRandom.jar') HANDLER = 'MyClass.myHandler'; SELECT my_java_udf_1(), my_java_udf_2() FROM table1; Creating and Calling a Simple In-Line Java UDF This example illustrates how to create and call a straightforward in-line Java User-Defined Function (UDF) that simply returns the VARCHAR value it receives. The function is defined with the optional CALLED ON NULL INPUT clause. This ensures that the function is executed even if the input value is NULL. Although the function would return NULL whether or not the clause is included, you can adjust the function to handle NULL values differently, such as returning an empty string instead. SQL CREATE OR REPLACE FUNCTION echo_varchar(x VARCHAR) RETURNS VARCHAR LANGUAGE JAVA CALLED ON NULL INPUT HANDLER = 'TestFunc.echoVarchar' TARGET_PATH = '@~/testfunc.jar' AS 'class TestFunc { public static String echoVarchar(String x) { return x; } }'; SELECT echo_varchar('Hello Java'); Passing a GEOGRAPHY Value to an In-Line Java UDF This example demonstrates how to pass a GEOGRAPHY value to an in-line Java UDF, enabling spatial data processing within the function. SQL CREATE OR REPLACE FUNCTION geography_equals(x GEOGRAPHY, y GEOGRAPHY) RETURNS BOOLEAN LANGUAGE JAVA PACKAGES = ('com.snowflake:snowpark:1.2.0') HANDLER = 'TestGeography.compute' AS $$ import com.snowflake.snowpark_java.types.Geography; class TestGeography { public static boolean compute(Geography geo1, Geography geo2) { return geo1.equals(geo2); } } $$; CREATE OR REPLACE TABLE geocache_table (id INTEGER, g1 GEOGRAPHY, g2 GEOGRAPHY); INSERT INTO geocache_table (id, g1, g2) SELECT 1, TO_GEOGRAPHY('POINT(-122.35 37.55)'), TO_GEOGRAPHY('POINT(-122.35 37.55)'); INSERT INTO geocache_table (id, g1, g2) SELECT 2, TO_GEOGRAPHY('POINT(-122.35 37.55)'), TO_GEOGRAPHY('POINT(90.0 45.0)'); SELECT id, g1, g2, geography_equals(g1, g2) AS "EQUAL?" FROM geocache_table ORDER BY id; Stored Procedures vs UDFs: Know the Difference Here’s a quick comparison: FeatureStored ProcedureUser-Defined Function (UDF)PurposePerform admin or batch operations using SQL.Return a computed value, often used in queries.Return ValueOptional — may return status or custom values.Required — must return a value explicitly.SQL IntegrationCalled as stand-alone SQL commands.Embedded inline in SQL (e.g., SELECT MyFunc(col)).Best ForDDL/DML, workflows, automation.Transformations, expressions, calculations. Additionally: UDFs return a value; stored procedures need not UDF return values are directly usable in SQL; stored procedure return values may not be UDFs can be called in the context of another statement; stored procedures are called independently Multiple UDFs may be called with one statement; a single stored procedure is called with one statement UDFs may access the database with simple queries only; stored procedures can execute DDL and DML statements Final Thoughts This article explored key techniques for building robust Java-based solutions using Snowpark APIs. We covered creating and calling Java stored procedures and UDFs, performing asynchronous operations, handling unstructured data through file access, and returning tabular results. These tools allow you to enhance your workflows with custom logic, parallelism, and integration with external data formats. As you continue to develop with Java and Snowpark, consider how these features can optimize your data workflows and enable more complex processing scenarios. Whether you're encapsulating business logic, processing files at scale, or improving performance through parallel execution, Snowpark's support for Java provides the flexibility to build scalable and maintainable solutions.
According to recent research, enterprises would probably be losing approximately $406 million every year due to low-quality data, which prevents their AI applications from working efficiently [1][2][3]. Research shows that the accumulated losses will be a staggering amount, reaching $745 billion by the end of 2025. Data quality is not an option or recommendation for developers and data engineers; it is a technical requirement. This article describes the gateways, sources, and methods for creating AI systems that depend on a flow of quality information. Financial loss is just the tip of the iceberg of the destructive effects of poor data quality. Insights must be actionable, specifically for data-driven organizations. With AI systems being applied in more and more domains, the cost of using poor data is ever-increasing. In healthcare, for example, erroneous patient data can result in misdiagnoses or incorrect treatment. In finance, faulty market data can lead to receiving lousy investment advice, which can cost billions. Furthermore, the impacts of subpar data can be felt throughout an organization’s entire AI ecosystem. This can result in biased or inconsistent machine learning models, which create a lack of trust in AI-driven insights. This, in turn, can stifle AI adoption and innovation, placing companies that do so at a competitive disadvantage in an increasingly data-driven night lights business environment. Real-world experience, therefore, shows that to overcome these challenges, organizations need data quality as one of the pillars of their AI strategy. Essentially, this means improved processes and systems for data governance, investment in enhanced data cleansing and validation tools, and upskilling at all levels for increased data literacy in the enterprise. Data is the new oil, but to oil that can be a perfect blend for all, businesses must elevate data to an information level to enhance with humans and machines for unlimited performance. Why Data Quality Matters Models trained with AI tend to magnify the accuracy issues of input data, which can have far-reaching effects on the world. Here are some of the more prominent examples: Recruiting tool: Historical hiring data included gender bias, which resulted in discrimination against women in hiring [4][5][6].Microsoft’s chatbot Tay: Released inappropriate tweets after being trained on unmoderated social media [7][8][9].IBM Watson for Oncology: It failed because the data it was trained on was synthetic and did not reflect actual patient cases [9][10][11].“80% of the work in machine learning is data preparation,” as Andrew Ng (Stanford AI Lab) points out [12]. Even a powerful model such as the GPT-4 could fail miserably without clean data to train on. Key Challenges in 2025 Bias and Incomplete Datasets Problem: Too often, AI models trained on biased or unequipped datasets make poor predictions. For instance, dark-skinned people have been found to have an error rate 34% higher on facial recognition systems trained on undiverse data [13].Solution: Conduct automated bias detection frameworks as well as by integrating tools like Label Studio and Datafold or using an active learning loop; depending on the immediate need to improve the diversity of dataset labels [14][15]. Regulatory Compliance Pain point: Emerging data regulations like GDPR and the new 2025 AI regulations set up in India necessitate auditable data lineage and ethical AI practices [16].Solution: Use modern data-sharing tools, such as IBM Watson Knowledge Catalog, which is designed with role-based access controls and differential privacy mechanisms to ensure compliance [16][17]. Infrastructure Costs Problem: The cost of training LLMs like GPT-4 surpasses $10 million in GPU resources [18].Solution: Use small language models (SLMs) — Microsoft’s Phi-3, which achieves very close to identical performance at one-tenth the cost by employing curated, high-quality training data. You are on the verge of the recent news, and advancements like DeepSeek R1 prove the success of large language models using low-cost configurations [18]. Tools and Techniques for Ensuring Data Quality in AI Best Data Quality Tools for AI (2025) Tool Strengths Use Case Ataccama AI-powered anomaly detection Enterprise-scale data quality Informatica Prebuilt AI rules for observability Real-time streaming data Datafold Automated regression testing ML model input monitoring Great Expectations Schema validation Open-source data pipelines Code Example: Real-Time Data Validation with TensorFlow Python import tensorflow_data_validation as tfdv #Calculate statistics and detect values out of the ordinary stats = tfdv. data_path = generate_statistics_from_tfrecord() schema = tfdv. infer_schema(stats) anomalies = tfdv. stats[validate_statistics(stats, schema) #Display detected anomalies tfdv. display_anomalies(anomalies) Best Practices for Developers Driving Proactive Data Governance Data contracts: Establish expected input and output JSON schemas as contracts [19].Automated lineage tracking: Use Apache Atlas to trace the data flow from the source to the AI models [19]. Adopt Real-Time Monitoring Monitoring feature drift (e.g., missing data >2% in input or KL divergence > 0.1) for inconsistencies before deployment [14][15]. Optimize Data Labeling Label uncertain samples in order of priority using active learning to improve model performance: Python import numpy as np model.eval() uncertainties = entropy(model. predict_proba(unlabeled_data)) query_indices = np. argsort(uncertainties)[-100:]#pick top 100 uncertainty Case Studies: Learning in the Field Success: AstraZeneca’s 3-Minute Analytics Challenge: Poor-quality clinical trial data delayed drug development.Solution: Implemented Talend Data Fabric to automate validation [20].Outcome: Reduced analysis time by 98%, accelerating FDA approvals by six months [19]. Failure: Self-Driving Scrub Machines Challenge: Faulty sensor data led to frequent collisions.Root cause: Noisy LiDAR data from uncalibrated IoT devices [13].Fix: Applied PyOD anomaly detection and implemented daily sensor calibration [13]. Factors Influencing the Future of Data Quality for AI Self-healing data pipelines: Schema drifts will be automatically corrected by AI-driven tools like Snowflake GPT-4 [19][20].Synthetic data generation: Healthcare AI will employ generative adversarial networks (GANs) to generate privacy-compliant datasets [21][22].AI-powered data governance: Platforms like Databricks Unity Catalog, and Amazon Macie will automate data quality rules based on machine learning [16][23]. Conclusion Clean, comprehensive data is not just a nice-to-have requirement anymore — it’s the underpinning of reliable AI. Utilizing tools such as Ataccama for anomaly detection, TensorFlow for validation, and active learning practices to reduce bias can help combat AI failures. Developers will need to prioritize these three will be essential as real-time analytics and edge computing continue to take off in 2025. Automated monitoring (e.g., Datafold for feature drift) [14][15].Ethical audits to mitigate bias [9][13].Synthetic data for compliance [21]. Almost all these strategies could help ensure that enterprises get reliable, unbiased, and efficient outcomes from their AI systems [24]. References Fivetran: $406M LossesAgility PR: AI Investment LossesSDxCentral: AI Data LossesCMU: Amazon Hiring BiasACLU: Amazon BiasReuters: Amazon AI BiasCmwired: Microsoft TayHarvard: Tay Case StudyOpinosis: Tay Data IssuesHarvard Ethics: IBM Watson FailureCaseCentre: IBM Watson FailureKeymakr: AI Data ChallengesShelf.io: Data Quality ImpactDatafold: Regression Testing Datafold: Data Quality ToolsIBM Knowledge CatalogIBM Knowledge CatalogSDxCentral: AI Data LossesAtaccama: Data Quality PlatformAtaccama: Anomaly DetectionEconOne: AI Data QualityInfosys: GenAI Data QualityCaseCentre: IBM Watson FailureSDxCentral: AI Data Losses
In the world of data analysis, "slow queries" are like workplace headaches that just won't go away. Recently, I've met quite a few data analysts who complain about queries running for hours without results, leaving them staring helplessly at the spinning progress bar. Last week, I ran into an old friend who was struggling with the performance of a large table JOIN. "The query speed is slower than a snail, and my boss is driving me crazy..." he said with a frustrated look. As a seasoned database optimization expert with years of experience on the front lines, I couldn't help but smile: "JOIN performance is slow because you don't understand its nature. Just like in martial arts, understanding how to use force effectively can make all the difference." Today, let's dive into the world of JOIN operations in Doris and see how you can transform your queries into "lightning-fast" operations that will impress your boss! Doris JOIN Secret: Performance Optimization Starts With Choosing the Right JOIN Strategy Data Analyst Xiao Zhang recently encountered a tricky problem. He was working on a large-scale data analysis task that required joining several massive tables. Initially, he used the most conventional JOIN method, but the query speed was painfully slow, taking hours to complete. This left him in a tough spot — his boss needed the report urgently, and the pressure was mounting. Xiao Zhang turned to his old friend and Doris expert, Old Li, for help. Old Li smiled and said, "The key to JOIN performance is choosing the right JOIN strategy. Doris supports multiple JOIN implementations, and I'll share some secrets with you." The Essence of JOIN In distributed databases, JOIN operations may seem simple, but they actually involve hidden complexities. They not only need to join tables but also coordinate data flow and computation in a distributed environment. For example, you have two large tables distributed across different nodes, and to perform a JOIN, you need to solve a core problem — how to bring the data to be joined together. This involves data redistribution strategies. Doris' JOIN Arsenal Doris employs two main physical implementations for JOIN operations — Hash Join and Nest Loop Join. Hash Join is like a martial artist's swift sword, completing joins with lightning speed; Nest Loop Join is like a basic skill for a swordsman, simple but widely applicable. Hash Join builds a hash table in memory and can quickly complete equi-JOIN operations. It's like finding teammates in a game of League of Legends — each player is assigned a unique ID, and when needed, the system locates them directly, naturally achieving high efficiency. Nest Loop Join, on the other hand, uses the most straightforward method — iteration. It's like visiting relatives during the Spring Festival, going door-to-door, which is slower but ensures that no one is missed. This method is applicable to all JOIN scenarios, including non-equi-JOINs. The Four Data Distribution Strategies in the JOIN World As a distributed MPP database, Apache Doris needs to shuffle data during the Hash Join process to ensure the correctness of the JOIN results. Old Li pulled out a diagram and showed Xiao Zhang the four data distribution strategies for JOIN operations in Doris: Broadcast Join: The Dominant Player Broadcast Join is like a domineering CEO who replicates the right table's data to every compute node. It's simple and brutal, widely applicable. When the right table's data volume is small, this method is efficient. Network overhead grows linearly with the number of nodes. As shown in the figure, Broadcast Join involves sending all the data from the right table to all nodes participating in the JOIN computation, including the nodes scanning the left table data, while the left table data remains stationary. Each node receives a complete copy of the right table's data (total data volume T(R)), ensuring that all nodes have the necessary data to perform the JOIN operation. This method is suitable for a variety of general scenarios but is not applicable to RIGHT OUTER, RIGHT ANTI, and RIGHT SEMI types of Hash Join. The network overhead is the number of JOIN nodes N multiplied by the right table's data volume T(R). Partition Shuffle: The Conventional Approach Partition Shuffle employs a bidirectional distribution strategy, hashing and distributing both tables based on the JOIN key. The network overhead equals the sum of the data volumes of the two tables. This method is like Tai Chi, emphasizing balance, and is suitable for scenarios where the data volumes of the two tables are similar. This method involves calculating hash values based on the JOIN condition and partitioning the data accordingly. Specifically, the data from both the left and right tables is partitioned based on the hash values calculated from the JOIN condition and then sent to the corresponding partition nodes (as shown in the figure). The network overhead for this method includes the cost of transmitting the left table's data T(S) and the cost of transmitting the right table's data T(R). This method only supports Hash Join operations because it relies on the JOIN condition to perform data partitioning. Bucket Shuffle: The Cost-Effective Approach Bucket Shuffle leverages the bucketing characteristics of the tables, requiring redistribution of only the right table's data. The network overhead is merely the data volume of the right table. It's like a martial artist using the opponent's strength against them, making good use of their advantages. This method is particularly efficient when the left table is already bucketed by the JOIN key. When the JOIN condition includes the bucketing column of the left table, the left table's data remains stationary, and the right table's data is redistributed to the left table's nodes for JOIN, reducing network overhead. When one side of the table participating in the JOIN operation has data that is already hash-distributed according to the JOIN condition column, we can choose to keep the data location of this side unchanged and only move and redistribute the other side of the table. (Here, the "table" is not limited to physically stored tables but can also be the output result of any operator in an SQL query, and we can flexibly choose to keep the data location of the left or right table unchanged and only move the other side.) Taking Doris' physical tables as an example, since the table data is stored through hash distribution calculations, we can directly use this characteristic to optimize the data shuffle process during JOIN operations. Suppose we have two tables that need to be JOINed, and the JOIN column is the bucketing column of the left table. In this case, we do not need to move the left table's data; we only need to redistribute the right table's data according to the left table's bucketing information to complete the JOIN calculation (as shown in the figure). The network overhead for this process mainly comes from the movement of the right table's data, i.e., T(R). Colocate Join: The Ultimate Expert Colocate Join is the ultimate optimization, where data is pre-distributed in the same manner, and no data movement is required during JOIN. It's like a perfectly synchronized partnership, with seamless cooperation. Zero network overhead and optimal performance, but it has the strictest requirements. Similar to Bucket Shuffle Join, if both sides of the table participating in the JOIN are hash-distributed according to the JOIN condition column, we can skip the shuffle process and directly perform the JOIN calculation locally. Here is a simple illustration using physical tables: When Doris creates tables with DISTRIBUTED BY HASH, the data is distributed according to the hash distribution key during data import. If the hash distribution key of the two tables happens to match the JOIN condition column, we can consider that the data of these two tables has been pre-distributed according to the JOIN requirements, i.e., no additional shuffle operation is needed. Therefore, during actual querying, we can directly perform the JOIN calculation on these two tables. Colocate Join Example After introducing the four types of Hash Join in Doris, let's pick Colocate Join, the ultimate expert, for a showdown: In the example below, both tables t1 and t2 have been processed by the GROUP BY operator and output new tables (at this point, both tx and ty are hash-distributed according to c2). The subsequent JOIN condition is tx.c2 = ty.c2, which perfectly meets the conditions for Colocate Join. SQL explain select * from ( -- Table t1 is hash-distributed by c1. After the GROUP BY operator, the data distribution becomes hash-distributed by c2. select c2 as c2, sum(c1) as c1 from t1 group by c2 ) tx join ( -- Table t2 is hash-distributed by c1. After the GROUP BY operator, the data distribution becomes hash-distributed by c2. select c2 as c2, sum(c1) as c1 from t2 group by c2 ) ty on tx.c2 = ty.c2; From the Explain execution plan result below, we can see that the left child node of the 8th Hash Join node is the 7th aggregation node, and the right child node is the 3rd aggregation node, with no Exchange node appearing. This indicates that the data from both the left and right child nodes after aggregation remains in its original location without any data movement and can directly perform the subsequent Hash Join operation locally. SQL +------------------------------------------------------------+ | Explain String(Nereids Planner) | +------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS: | | c2[#20] | | c1[#21] | | c2[#22] | | c1[#23] | | PARTITION: HASH_PARTITIONED: c2[#10] | | | | HAS_COLO_PLAN_NODE: true | | | | VRESULT SINK | | MYSQL_PROTOCAL | | | | 8:VHASH JOIN(373) | | | join op: INNER JOIN(PARTITIONED)[] | | | equal join conjunct: (c2[#14] = c2[#6]) | | | cardinality=10 | | | vec output tuple id: 9 | | | output tuple id: 9 | | | vIntermediate tuple ids: 8 | | | hash output slot ids: 6 7 14 15 | | | final projections: c2[#16], c1[#17], c2[#18], c1[#19] | | | final project output tuple id: 9 | | | distribute expr lists: c2[#14] | | | distribute expr lists: c2[#6] | | | | | |----3:VAGGREGATE (merge finalize)(367) | | | | output: sum(partial_sum(c1)[#3])[#5] | | | | group by: c2[#2] | | | | sortByGroupKey:false | | | | cardinality=5 | | | | final projections: c2[#4], c1[#5] | | | | final project output tuple id: 3 | | | | distribute expr lists: c2[#2] | | | | | | | 2:VEXCHANGE | | | offset: 0 | | | distribute expr lists: | | | | | 7:VAGGREGATE (merge finalize)(354) | | | output: sum(partial_sum(c1)[#11])[#13] | | | group by: c2[#10] | | | sortByGroupKey:false | | | cardinality=10 | | | final projections: c2[#12], c1[#13] | | | final project output tuple id: 7 | | | distribute expr lists: c2[#10] | | | | | 6:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 1 | | | | PARTITION: HASH_PARTITIONED: c1[#8] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 06 | | HASH_PARTITIONED: c2[#10] | | | | 5:VAGGREGATE (update serialize)(348) | | | STREAMING | | | output: partial_sum(c1[#8])[#11] | | | group by: c2[#9] | | | sortByGroupKey:false | | | cardinality=10 | | | distribute expr lists: c1[#8] | | | | | 4:VOlapScanNode(345) | | TABLE: tt.t1(t1), PREAGGREGATION: ON | | partitions=1/1 (t1) | | tablets=1/1, tabletList=491188 | | cardinality=21, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 2 | | | | PARTITION: HASH_PARTITIONED: c1[#0] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | HASH_PARTITIONED: c2[#2] | | | | 1:VAGGREGATE (update serialize)(361) | | | STREAMING | | | output: partial_sum(c1[#0])[#3] | | | group by: c2[#1] | | | sortByGroupKey:false | | | cardinality=5 | | | distribute expr lists: c1[#0] | | | | | 0:VOlapScanNode(358) | | TABLE: tt.t2(t2), PREAGGREGATION: ON | | partitions=1/1 (t2) | | tablets=1/1, tabletList=491198 | | cardinality=10, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | | | Statistics | | planed with unknown column statistics | +------------------------------------------------------------+ 105 rows in set (0.06 sec) The Path to JOIN Decision After listening to Old Li's explanation, Xiao Zhang had an epiphany. JOIN optimization is not about simply choosing one solution; it's about making flexible decisions based on the actual situation: Joining a large table with a small table? Go all-in with Broadcast Join. Tables of similar size? Partition Shuffle is a safe bet. Left table bucketed appropriately? Bucket Shuffle shows its power. Joining tables in the same group? Colocate Join leads the way. Non-equi-JOIN? Nest Loop Join comes to the rescue. After putting these strategies into practice, Xiao Zhang compiled a set of JOIN optimization tips: Plan data distribution in advance to lay the foundation for high-performance JOIN operations.Leverage partition pruning to reduce the amount of data involved in JOINs.Design bucketing strategies wisely to create favorable conditions for Bucket Shuffle and Colocate Join.Configure parallelism and memory properly to maximize JOIN performance.Monitor resource usage closely to identify performance bottlenecks in a timely manner. By optimizing queries based on these insights, Xiao Zhang was able to reduce tasks that originally took hours to just a few minutes, earning high praise from his boss. The world of JOIN optimization is vast and complex; what we've explored today is just the tip of the iceberg. Choosing the right JOIN strategy for different scenarios is key to finding the optimal balance between performance and resource consumption.
Effective management of large datasets is paramount for both performance and cost optimization. Oracle 19c introduces Hybrid Partitioned Tables (HPT), a feature that allows you to distribute table partitions across multiple storage tiers — from high-performance on-premises systems to cost-effective cloud-based object storage. This approach enables organizations to store frequently accessed “hot” data locally for faster processing while archiving less-active “cold” data in the cloud. Learn how to implement Hybrid Partitioned Tables (HPT) to streamline data management, boost query performance, and cut storage costs. What Are Hybrid Partitioned Tables? Hybrid Partitioned Tables enable you to: Store hot data (frequently accessed) in local database storage.Store cold data (infrequently accessed) in external object storage, such as Oracle Cloud Infrastructure (OCI) Object Storage.Query both internal and external partitions seamlessly without application changes. This approach is ideal for organizations looking to optimize storage costs while maintaining high performance for critical data. Step-by-Step Guide to Implementing Hybrid Partitioned Tables Step 1: Prerequisites Oracle Database 19c installed.Access to Oracle Cloud Infrastructure (OCI) Object Storage (or any compatible external storage).A bucket was created in OCI Object Storage to store external partitions. Dataflow-hybrid partition Step 2: Create a Hybrid Partitioned Table Let’s create a hybrid partitioned table to store sales data. Recent sales data will be stored locally, while older data will be stored in OCI Object Storage. SQL -- Create a Hybrid Partitioned Table CREATE TABLE sales ( sale_id NUMBER, sale_date DATE, amount NUMBER ) PARTITION BY RANGE (sale_date) ( PARTITION p2023_q1 VALUES LESS THAN (TO_DATE('2023-04-01', 'YYYY-MM-DD')), PARTITION p2023_q2 VALUES LESS THAN (TO_DATE('2023-07-01', 'YYYY-MM-DD')), PARTITION p2023_q3 VALUES LESS THAN (TO_DATE('2023-10-01', 'YYYY-MM-DD')), PARTITION p2023_q4 VALUES LESS THAN (TO_DATE('2024-01-01', 'YYYY-MM-DD')) ) HYBRID ( PARTITION p2023_q1 EXTERNAL LOCATION ( 'https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/bucket/o/p2023_q1.csv' ), PARTITION p2023_q2 EXTERNAL LOCATION ( 'https://objectstorage.us-ashburn-1.oraclecloud.com/n/namespace/b/bucket/o/p2023_q2.csv' ) ); Step 3: Load Data into Partitions Load recent data into internal partitions: SQL INSERT INTO sales VALUES (1, TO_DATE('2023-01-15', 'YYYY-MM-DD'), 1000); INSERT INTO sales VALUES (2, TO_DATE('2023-06-20', 'YYYY-MM-DD'), 1500); Load older data into external partitions (upload CSV files to OCI Object Storage): SQL -- Example CSV content for p2023_q1.csv: -- 3,2023-02-10,2000 -- 4,2023-03-25,2500 Step 4: Query the Hybrid Partitioned Table Query the table to retrieve data from both internal and external partitions: SQL SELECT * FROM sales WHERE sale_date BETWEEN TO_DATE('2023-01-01', 'YYYY-MM-DD') AND TO_DATE('2023-12-31', 'YYYY-MM-DD'); Adding and Dropping Partitions Adding a Partition Use the statement to add a new partition to the hybrid partitioned table. For example, to add a partition for Q1 2024: SQL ALTER TABLE sales ADD PARTITION p2024_q1 VALUES LESS THAN (TO_DATE('2024-04-01', 'YYYY-MM-DD')); Dropping a Partition To drop an existing partition, use the ALTER TABLE ... DROP PARTITION statement. For example, to drop the Q1 2023 partition: SQL ALTER TABLE sales DROP PARTITION p2023_q1; Adding an External Partition To add an external partition, specify the external location: SQL ALTER TABLE sales ADD PARTITION p2023_q1 VALUES LESS THAN (TO_DATE('2023-04-01', 'YYYY-MM-DD')) EXTERNAL DEFAULT DIRECTORY tmp_dir LOCATION ('p2023_q1.csv'); Dropping an External Partition Dropping an external partition is similar to dropping an internal partition: SQL ALTER TABLE sales DROP PARTITION p2023_q1; Simulation Scenario: Performance and Cost Benefits Scenario Setup Table Size: 1 million rows (500,000 in internal storage, 500,000 in external storage).Query Pattern: 80% of queries access recent data (internal partitions).20% of queries access historical data (external partitions). Performance Metrics MetricInternal PartitionsExternal PartitionsQuery Response Time (Avg)0.5 seconds2.5 secondsStorage Cost (per GB/month)$0.10 (on-premises)$0.02 (cloud) Cost Savings Storing 500,000 rows in external storage reduces monthly storage costs by 80% compared to on-premises storage. Restrictions on Hybrid Partitioned Tables While Hybrid Partitioned Tables offer significant flexibility, there are some restrictions to be aware of: Partitioning Types: Only RANGE and LIST partitioning are supported.REFERENCE and SYSTEM partitioning are not supported.DML Operations: DML operations (INSERT, UPDATE, DELETE) are allowed only on internal partitions.Attempting to modify external partitions results in an error. SQL INSERT INTO sales VALUES ('GBR', 9999, 'X', 'X'); -- ERROR: ORA-14466: Data in a read-only partition or subpartition cannot be modified. External Partition Limitations: External partitions are read-only.Data in external partitions must be stored in flat files (e.g., CSV) in object storage.Other Restrictions: Certain operations, such as splitting or merging partitions, may have limitations depending on the partitioning type and storage tier. Best Practices for Hybrid Partitioned Tables Data Lifecycle Management: Use Oracle’s Automatic Data Optimization (ADO) to automate the movement of data between storage tiers based on access patterns.Partition Design: Choose a partitioning strategy (e.g., range, list) that aligns with your data access patterns.Monitoring: Regularly monitor query performance and storage costs using Oracle’s performance views (e.g., DBA_HYBRID_PART_TABLES). Conclusion Hybrid Partitioned Tables in Oracle 19c provide a flexible, cost-effective solution for managing large datasets. Organizations can achieve significant cost savings without compromising performance by storing hot data locally and cold data in external storage. Follow the steps in this article to implement HPT and optimize your data storage strategy.
If your company has budget constraints, purchasing licensed products like Splunk for logging infrastructure may not be feasible. Fortunately, a powerful open-source alternative exists: ELK (Elasticsearch, Logstash, and Kibana). ELK offers robust logging and visualization capabilities. At a startup where I worked, cost minimization was a priority, so I implemented ELK for logging. In this article, I'll guide you through setting up and configuring the free version of the ELK stack on GCP using Terraform and Ansible. However, the same instructions can be followed to deploy it on other cloud platforms like AWS and Azure. Why Choose ELK? After thorough research, I decided to implement the ELK stack on GCP using virtual machines (VMs) for logging due to its ease of use, rich dashboards, and straightforward setup process. While I could have deployed it on a GKE cluster, I opted for VMs at the time for various reasons. Elasticsearch is an open-source search and analytics engine that allows you to collect and analyze logs from multiple sources, including IoT devices, application servers, web servers, and cloud services. The ELK stack consists of the following components: Elasticsearch – Stores and indexes log data Logstash – Filters and formats logs before ingestion Kibana – Provides a graphical user interface (GUI) for searching and visualizing logs Filebeat – A lightweight log shipper installed as an agent on machines generating logs Figure 1 Prerequisites Before setting up ELK, ensure you have the following: A cloud account (Google Cloud, AWS, or Azure). This guide uses GCP. Terraform and Ansible installed on your local machine. Proper authentication configured between your local machine and the cloud provider (Google Cloud or any other) with the required access permissions for Terraform and Ansible. Part 1: ELK Infrastructure Setup Using Terraform on GCP The ELK stack consists of various nodes, each serving a specific function to enhance scalability and failover: Master nodes – Manage cluster operations and indexing. Data nodes – Store and index log data for search and analysis. Kibana node – Provides a GUI for log visualization and analytics. Logstash node – Filters, transforms, and ingests logs from various sources. While all functionalities can be combined on a single node, separating them in a production environment improves scalability and fault tolerance, depending on the workload. Create the following files in a folder where you plan to run the Terraform code, or clone my Git repository, which contains all the code: GitHub - pradeep-gaddamidi/ELK. 1. create_elk_instances.tf JSON locals { config = var.environment_config[terraform.workspace] instances = [for key, value in local.config.nodes : { name = key machine_type = ( can(regex("master_.*", value)) ? local.config.master_machine_type : can(regex("kibana_.*", value)) ? local.config.kibana_machine_type : can(regex("logstash_.*", value)) ? local.config.logstash_machine_type : local.config.node_machine_type ) zone = ( can(regex(".*_zoneb", value)) ? local.config.region_zones[1] : can(regex(".*_zonec", value)) ? local.config.region_zones[2] : local.config.region_zones[0] ) network_tags = local.config.network_tags ssh_keys = local.config.ssh_keys static_ip_name = key # Modify or leave null as needed service_account_name = "elastic" # Modify or leave null as needed disk_name = key # Modify or leave null as needed disk_type = "pd-standard" # Modify as needed disk_size = ( can(regex("master_.*", value)) ? local.config.master_disk_size : can(regex("kibana_.*", value)) ? local.config.kibana_disk_size : can(regex("logstash_.*", value)) ? local.config.logstash_disk_size : local.config.node_disk_size ) disk_zone = ( can(regex(".*_zoneb", value)) ? local.config.region_zones[1] : can(regex(".*_zonec", value)) ? local.config.region_zones[2] : local.config.region_zones[0] ) disk_project = local.config.project_name }] } module "gcp_instance" { source = "../../modules/gcp_custom_instance" gce_image = local.config.gce_image subnet = local.config.subnet region = local.config.region # Provide only when creating static IPS instances = local.instances use_common_service_account = local.config.use_common_service_account # Provide only when creating a common service account accross all the instances } 2. variables.tf JSON variable "environment_config" { description = "Configuration per environment" type = map(object({ project_name = string region = string region_zones = list(string) master_machine_type = string node_machine_type = string kibana_machine_type = string logstash_machine_type= string network_tags = list(string) network = string subnet = string gce_image = string ca_bucket_location = string backup_bucket = string master_disk_size = number node_disk_size = number kibana_disk_size = number logstash_disk_size = number use_common_service_account = bool machine_access_scopes= list(string) nodes = map(string) ssh_keys = list(string) })) default = { nonprod = { project_name = "nonprod-infra-monitoring" region = "us-central1" region_zones = ["us-central1-a", "us-central1-b"] master_machine_type = "n1-standard-2" node_machine_type = "n1-standard-2" kibana_machine_type = "n1-standard-2" logstash_machine_type= "n1-standard-2" network_tags = ["elastic", "nonprod"] network = "projects/nonprod-networking/global/networks/nonprod-vpc" subnet = "projects/nonprod-networking/regions/us-central1/subnetworks/nonprod-sub01" gce_image = "debian-cloud/debian-12" ca_bucket_location = "nonprod-elastic-certificates" backup_bucket = "nonprod-elastic-backup" master_disk_size = 100 node_disk_size = 510 kibana_disk_size = 100 logstash_disk_size = 100 use_common_service_account = true machine_access_scopes = ["cloud-platform"] ssh_keys = [] nodes = { "nonprod-elastic-master-node1" = "master_zonea" "nonprod-elastic-data-node1" = "data_zonea" "nonprod-elastic-data-node2" = "data_zoneb" "nonprod-elastic-kibana" = "kibana_zonea" "nonprod-elastic-logstash" = "logstash_zonea" } } prod = { project_name = "prod-infra-monitoring" region = "us-central1" region_zones = ["us-central1-a", "us-central1-b", "us-central1-c"] master_machine_type = "n2-standard-2" node_machine_type = "n2-highmem-4" kibana_machine_type = "n2-standard-2" logstash_machine_type= "n2-standard-2" network_tags = ["elastic", "prod"] network = "projects/prod-networking/global/networks/prod-vpc" subnet = "projects/prod-networking/regions/us-central1/subnetworks/prod-sub01" gce_image = "debian-cloud/debian-12" ca_bucket_location = "prod-elastic-certificates" backup_bucket = "prod-elastic-backup" master_disk_size = 100 node_disk_size = 3000 kibana_disk_size = 100 logstash_disk_size = 100 use_common_service_account = true machine_access_scopes = ["cloud-platform"] ssh_keys = [] nodes = { "elastic-master-node1" = "master_zonea" "elastic-master-node2" = "master_zoneb" "elastic-master-node3" = "master_zonec" "elastic-data-node1" = "data_zonea" "elastic-data-node2" = "data_zonea" "elastic-data-node3" = "data_zoneb" "elastic-data-node4" = "data_zoneb" "elastic-data-node5" = "data_zonea" "elastic-data-node6" = "data_zoneb" "elastic-kibana" = "kibana_zonea" "elastic-logstash" = "logstash_zonea" "elastic-logstash2" = "logstash_zoneb" "elastic-logstash3" = "logstash_zonec" } } } } I have created a custom module to provision GCP instances and used it in the create_elk_instances.tf file. However, you can also use GCP's official Terraform module to create VM instances. JSON module "gcp_instance" { source = "./modules/gcp_custom_instance" The ./modules/gcp_custom_instance folder must have the files, gcp_custom_vm.tf and variables_custom.tf). Below is the code for my custom module: 3. gcp_custom_vm.tf JSON locals { common_service_account_email = var.use_common_service_account ? google_service_account.common_service_account[0].email : null } resource "google_compute_instance" "google-compute-instance" { for_each = { for index, inst in var.instances : inst.name => inst } name = each.value.name machine_type = each.value.machine_type zone = each.value.zone # allow_stopping_for_update = true tags = each.value.network_tags metadata = { ssh-keys = join("\n", each.value.ssh_keys) } boot_disk { initialize_params { image = var.gce_image } } network_interface { subnetwork = var.subnet network_ip = each.value.static_ip_name != null ? google_compute_address.static_ips[each.value.static_ip_name].address : null } dynamic "service_account" { for_each = each.value.service_account_name != null ? [1] : [] content { scopes = var.machine_access_scopes email = var.use_common_service_account ? google_service_account.common_service_account[0].email : google_service_account.individual_service_account[each.value.name].email } } dynamic "attached_disk" { for_each = each.value.disk_name != null ? [1] : [] content { source = google_compute_disk.google-compute-disk[each.value.disk_name].self_link device_name = "${each.value.disk_name}-data" mode = "READ_WRITE" } } } resource "google_compute_disk" "google-compute-disk" { for_each = { for index, inst in var.instances : inst.disk_name => inst if inst.disk_name != null } name = "${each.value.disk_name}-data" type = each.value.disk_type size = each.value.disk_size zone = each.value.disk_zone project = each.value.disk_project } resource "google_service_account" "common_service_account" { count = var.use_common_service_account ? 1 : 0 account_id = var.use_common_service_account ? lookup(var.instances[0], "service_account_name", null) : null display_name = "Service Account" } resource "google_service_account" "individual_service_account" { for_each = { for index, inst in var.instances : inst.service_account_name => inst if inst.service_account_name != null && !var.use_common_service_account } account_id = each.value.service_account_name display_name = "Service account for ${each.value.name}" } resource "google_compute_address" "static_ips" { # Only include instances that have static_ip_name defined for_each = { for index, inst in var.instances : inst.static_ip_name => inst if inst.static_ip_name != null } name = each.value.static_ip_name address_type = "INTERNAL" region = var.region subnetwork = var.subnet } output "common_service_account_email" { value = local.common_service_account_email description = "The email of the common service account" } 4. variables_custom.tf JSON variable "instances" { description = "List of instance configurations" type = list(object({ name = string machine_type = string zone = string network_tags = optional(list(string)) ssh_keys = optional(list(string)) static_ip_name = optional(string) service_account_name = optional(string) disk_name = optional(string) disk_type = optional(string) disk_size = optional(number) disk_zone = optional(string) disk_project = optional(string) })) } variable "gce_image" { description = "GCE image for the instances" type = string default = "debian-cloud/debian-12" } variable "subnet" { description = "Subnet for the network" type = string } variable "region" { description = "GCP region" type = string default = "us-central1" } variable "use_common_service_account" { description = "Flag to determine if a common service account should be used for all instances" type = bool default = false } variable "machine_access_scopes" { description = "Scopes for machine access" type = list(string) default = ["cloud-platform"] } Assign permissions to the service accounts created earlier in the code: JSON locals { bucket_config = var.environment_config[terraform.workspace] } resource "google_storage_bucket_iam_binding" "elastic-backup" { bucket = local.bucket_config.backup_bucket role = "roles/storage.objectAdmin" members = local.config.use_common_service_account ? ["serviceAccount:${module.gcp_instance.common_service_account_email}"] : [] } resource "google_storage_bucket_iam_binding" "elastic-certs" { bucket = local.bucket_config.ca_bucket_location role = "roles/storage.objectViewer" members = local.config.use_common_service_account ? ["serviceAccount:${module.gcp_instance.common_service_account_email}"] : [] } Create the GCP buckets used for certificates and elastic backups: JSON resource "google_storage_bucket" "elastic-backup" { name = local.bucket_config.backup_bucket location = "US" storage_class = "STANDARD" uniform_bucket_level_access = true } resource "google_storage_bucket" "elastic-certs" { name = local.bucket_config.ca_bucket_location location = "US" storage_class = "STANDARD" uniform_bucket_level_access = true } You can use the below Terraform commands to create the above resources: JSON terraform workspace set nonprod (if you use workspaces) terraform init terraform plan terraform apply You can add new nodes as needed by updating variables, i.e., adding new nodes to the nodes section of the file and re-running the Terraform code. This will provision the new data nodes automatically. Now that the ELK infrastructure is set up, the next step is to install and configure the ELK software. Part 2: Configure the ELK Infrastructure Using Ansible Prerequisites 1. The certificate generation required for secure communication between various Elastic nodes can be automated. However, I chose to generate them manually by following the ELK guides. Set up basic security for the Elastic Stack Set up basic security for the Elastic Stack plus secured HTTPS traffic Once the certificates are generated, stage them on the GCP bucket elastic-certificates. 2. Make sure your Ansible hosts files are organized as below: All data and master nodes are grouped under the elastic section Kibana nodes under kibana section Logstash nodes under logstash Data nodes under data Master nodes under master Create the following files in a folder where you plan to run the Ansible playbook. Then, execute the Ansible playbook below to install and configure ELK. ansible.yaml YAML --- - name: Install Elasticsearch pre-reqs on Debian hosts: all become: yes tasks: - name: Update apt repository apt: update_cache: yes - name: Install default-jre apt: name: - default-jre state: present - name: Add Elasticsearch GPG key apt_key: url: https://artifacts.elastic.co/GPG-KEY-elasticsearch state: present - name: Install apt-transport-https apt: name: apt-transport-https state: present - name: Add Elasticsearch repository apt_repository: repo: "deb https://artifacts.elastic.co/packages/8.x/apt stable main" state: present filename: elastic-8.x - name: Update apt repository apt: update_cache: yes - name: Install Elasticsearch on Debian hosts: elastic become: yes tasks: - name: Install Elasticsearch apt: name: elasticsearch=8.11.2 state: present - name: Enable Elasticsearch service ansible.builtin.systemd: name: elasticsearch.service enabled: yes - name: Install Kibana on Debian hosts: kibana become: yes tasks: - name: Install Kibana apt: name: kibana=8.11.2 state: present - name: Enable kibana service ansible.builtin.systemd: name: kibana.service enabled: yes - name: Install logstash on Debian hosts: logstash become: yes tasks: - name: Install logstash apt: name: logstash=1:8.11.2-1 state: present - name: Enable logstash service ansible.builtin.systemd: name: logstash.service enabled: yes - name: Copy the kibana.yml configuration file to the kibana nodes hosts: kibana become: yes tasks: - name: Copy a kibana.yml file template: src: "{{ playbook_dir }/files/kibana.j2" dest: /etc/kibana/kibana.yml - name: Copy the pipelines.yml configuration file to the logstash nodes hosts: logstash become: yes tasks: - name: Copy a logstash pipelines.yml file template: src: "{{ playbook_dir }/files/logstash.j2" dest: /etc/logstash/conf.d/pipelines.conf - name: Copy the elasticsearch_node.yml configuration file to the nodes hosts: data gather_facts: yes become: yes tasks: - name: Get zone info from metadata server ansible.builtin.uri: url: http://metadata.google.internal/computeMetadata/v1/instance/zone method: GET return_content: yes # Ensures that the content is returned headers: Metadata-Flavor: "Google" register: zone_info check_mode: no - name: Extract the zone name set_fact: zone_name: "{{ zone_info.content.split('/')[-1] }" - name: Copy a elasticsearch_node.yml file template: src: "{{ playbook_dir }/files/elasticsearch_node.j2" dest: /etc/elasticsearch/elasticsearch.yml - name: Copy the elasticsearch_node.yml configuration file to the nodes hosts: master gather_facts: yes become: yes tasks: - name: Copy a elasticsearch_master.yml file template: src: "{{ playbook_dir }/files/elasticsearch_master.j2" dest: /etc/elasticsearch/elasticsearch.yml - name: Download the certificates from the GCS bucket hosts: elastic become: yes tasks: - name: certificates command: gsutil cp gs://nonprod-elastic-certificates/* /etc/elasticsearch/certs - name: Download the certificates from the GCS bucket hosts: kibana become: yes tasks: - name: certificates command: gsutil cp gs://nonprod-elastic-certificates/elasticsearch-ca.pem /etc/kibana - name: Download the certificates from the GCS bucket hosts: logstash become: yes tasks: - name: certificates command: gsutil cp gs://nonprod-elastic-certificates/elasticsearch-ca.pem /usr/share/logstash/pipeline/elasticsearch-ca.pem The configuration files required by the Ansible playbook should be placed in the files directory. The expected files are listed below: 1. elasticsearch_master.j2 Jinja2 node.name: {{ ansible_default_ipv4.address } node.roles: [ master ] discovery.seed_hosts: - 10.x.x.x - 10.x.x.x - 10.x.x.x #cluster.initial_master_nodes: # - 10.x.x.x # - 10.x.x.x # - 10.x.x.x network.host : {{ ansible_default_ipv4.address } cluster.name: prod-monitoring path: data: /mnt/disks/elasticsearch logs: /var/log/elasticsearch cluster.routing.allocation.awareness.attributes: zone cluster.routing.allocation.awareness.force.zone.values: us-central1-a,us-central1-b xpack.security.http.ssl.enabled: true xpack.security.http.ssl.keystore.path: /etc/elasticsearch/certs/http.p12 xpack.security.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.audit.enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/certs/elastic-certificates.p12 xpack.security.transport.ssl.client_authentication: required xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/certs/elastic-certificates.p12 xpack.license.self_generated.type: basic A few points to be noted about the above elastic master nodes configuration: We are using a basic (free) license, not a premium one. When Ansible runs on the master node, it automatically fills in the IPv4 address of the master node by default. Uncomment cluster.initial_master_nodes only when creating the cluster for the first time. Security is enabled between: Master nodes using xpack.security.transport.ssl.enabledData nodes and Kibana/Logstash using xpack.security.http.ssl.enabled 2. elasticsearch_node.j2 Jinja2 node.name: {{ ansible_default_ipv4.address } node.roles: [ data, transform, ingest ] discovery.seed_hosts: - 10.x.x.x - 10.x.x.x - 10.x.x.x #cluster.initial_master_nodes: # - 10.x.x.x # - 10.x.x.x # - 10.x.x.x network.host : {{ ansible_default_ipv4.address } cluster.name: prod-monitoring path: data: /mnt/disks/elasticsearch logs: /var/log/elasticsearch node.attr.zone: {{ zone_name } xpack.security.http.ssl.enabled: true xpack.security.http.ssl.keystore.path: /etc/elasticsearch/certs/http.p12 xpack.security.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.audit.enabled: true xpack.security.transport.ssl.verification_mode: certificate xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/certs/elastic-certificates.p12 xpack.security.transport.ssl.client_authentication: required xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/certs/elastic-certificates.p12 xpack.license.self_generated.type: basic 3. kibana.j2 Jinja2 elasticsearch.hosts: ["https://10.x.x.x:9200","https://10.x.x.x:9200","https://10.x.x.x:9200","https://10.x.x.x:9200"] server.name: kibana server.host: {{ ansible_default_ipv4.address } server.port: 443 elasticsearch.username: 'kibana_system' elasticsearch.password: 'somepassxxxxx' elasticsearch.ssl.certificateAuthorities: ['/etc/kibana/elasticsearch-ca.pem'] elasticsearch.ssl.verificationMode: 'certificate' server.ssl.enabled: true server.ssl.certificate: /etc/ssl/kibana/kibana-cert.crt server.ssl.key: /etc/ssl/kibana/kibana-key.key server.publicBaseUrl: https://elastic.company.xyz xpack.encryptedSavedObjects.encryptionKey: zxy123f1318d633817xyz1234 xpack.reporting.encryptionKey: 1xfsyc4ad24176a902f2xyz123 xpack.security.encryptionKey: cskcjsn60e148a70308d39dxyz123 logging: appenders: file: type: file fileName: /var/log/kibana/kibana.log layout: type: json root: appenders: - default - file pid.file: /run/kibana/kibana.pid 4. logstash.j2 Jinja2 input { beats { port => 5044 } tcp { port => 50000 } tcp { port => 5000 codec => "line" type => "syslog" } http { port => 5050 } google_pubsub { type => "pubsub" project_id => "my-project-123" topic => "cloud_functions_logs" subscription => "cloud_functions_logs-sub" ### json_key_file => "/etc/logstash/keys/logstash-sa.json" codec => "json" } google_pubsub { type => "pubsub" project_id => "my-project-123" topic => "cloud_run_logs" subscription => "cloud_run_logs-sub" ### json_key_file => "/etc/logstash/keys/logstash-sa.json" codec => "json" } } filter { grok { match => { "message" => "^%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}" } } date { match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] target => "@timestamp" } kv { source => "log_message" field_split => ",| " value_split => "=" } mutate { remove_field => [ "timestamp" ] convert => { "pid" => "integer" } } } ### Add your filters / logstash plugins configuration here output { elasticsearch { hosts => ["https://10.x.x.x:9200","https://10.x.x.x:9200","https://10.x.x.x:9200","https://10.x.x.x:9200"] user => "logstash_writer" password => "mypassxyz" index => "logs-my-index-%{+yyyy.MM.dd}" action => "create" ssl => true cacert => '/usr/share/logstash/pipeline/elasticsearch-ca.pem' } } A few points to be noted about the above logstash configuration: In the Logstash configuration above, we use various filters such as grok, date, kv, and mutate to match and modify incoming logs. Adjust according to your needs. In both kibana.j2 and logstash.j2, for "elasticsearch.hosts", you can specify all data nodes as a list, allowing requests to be round-robin distributed across them. Alternatively, configure an internal load balancer with data nodes as the backend and provide just the load balancer's IP. Ensure that the index and logstash_writer users are created via the Kibana console. Additionally, configure the necessary indices to ingest data from other sources like Filebeat and assign proper permissions to the respective users. Data can be ingested into Elasticsearch via Logstash, allowing for necessary filtering, or it can be sent directly to data nodes using agents like Filebeat. If you are storing any of the above .j2 Jinja files in a Git repository, and they contain sensitive information, encrypt them using ansible-vault. Refer to the Ansible documentation to learn more about using ansible-vault. Here is the Filebeat configuration if you want to ship logs directly from Docker applications. You can also use it to ship logs from any other applications. filebeat.conf YAML logging.json: true logging.level: info logging.metrics.enabled: false setup.kibana.host: ${KIBANA_HOST} setup.ilm.enabled: true output.elasticsearch: hosts: ${ELASTIC_HOST} indices: - index: "audit-%{+yyyy.MM.dd}" when.has_fields: ["_audit"] - index: "logs-%{+yyyy.MM.dd}" when.has_fields: ["app", "env"] - index: "invalid-stream-%{+yyyy.MM.dd}" when.has_fields: ["error.data", "error.message"] filebeat.autodiscover: providers: - type: docker templates: - config: - type: container paths: - /var/lib/docker/containers/${data.docker.container.id}/*.log processors: - decode_json_fields: fields: ["message"] process_array: false max_depth: 1 target: "" overwrite_keys: false add_error_key: true Once ELK is set up, you can configure data backups called snapshots to the 'elastic-backup' GCS bucket via the Kibana console. Conclusion Figure 2 With data being ingested from various sources, such as Filebeat, into the Elasticsearch cluster, you can access Kibana's UI to search logs (Figure 2), create visualizations, monitor logs, and set up alerts effectively. By installing and configuring the open-source ELK stack, you can significantly reduce licensing costs while only paying for the GCP infrastructure you use. Terraform and Ansible automation help you get up and running quickly, allowing for easy scaling with minimal effort. Good luck! Feel free to connect with me on LinkedIn.
At 3 a.m., the office is filled only with the dim glow of the computer screens. Data engineer Xiao Ming is struggling with two "heavyweights" — Doris and Hive. "Export, clean, import..." He mechanically repeats these steps between different components, his eyes starting to see stars. This scene is all too common in data teams, making one wonder: Do we really have to manually shuffle data for the rest of our lives? Just then, Doris extended an "olive branch" to Hive — the Hive Catalog made its dazzling debut! It's like arranging a perfect marriage for this "data couple," allowing Doris to directly read and write Hive data, enabling the two systems to "fly together." Whether it's HDFS or object storage, simple queries or complex analyses, one Catalog can handle it all! This amazing feature caught Xiao Ming's attention, and he could finally say goodbye to those cumbersome data synchronization tasks. Let's uncover this "life-saving tool" for data engineers together! The Perfect Encounter of Doris and Hive Late at night, Xiao Ming was staring at the screen, worried. As a data engineer, he faced a tricky problem: The company's data was scattered between Doris and Hive systems, and every cross-system data analysis required manual export and import, which was cumbersome and inefficient. "If only Doris could directly read and write Hive data..." he muttered to himself. Xiao Ming is not the only one with this concern. With the explosive growth of data, enterprise data architectures have become increasingly complex, with data storage scattered across various systems. How to connect these data silos and achieve unified data access and analysis has become a common technical pain point. The good news is that Apache Doris has perfectly solved this problem through the Hive Catalog feature. It's like building a bridge between Doris and Hive, enabling seamless collaboration between the two systems. Starting from version 2.1.3, through the Hive Catalog, Doris can not only query and import data from Hive but also perform operations such as creating tables and writing data back, truly realizing the architecture design of a unified lakehouse. The core value of Hive Catalog lies in its provision of a unified data access layer. For data developers, there is no need to worry about where the data is specifically stored; all data operations can be completed through Doris. For example, you can directly create a Hive table in Doris: SQL CREATE CATALOG hive PROPERTIES ( 'type'='hms', 'hive.metastore.uris' = 'thrift://172.21.16.47:7004' ); Once set up, you can operate Hive tables just like regular Doris tables. Not only does it support queries, but it also allows write operations such as INSERT and CREATE TABLE AS SELECT. The system automatically handles complex details such as partition management and file format conversion. Even more excitingly, Doris provides a comprehensive security mechanism. By integrating Kerberos authentication and Ranger permission management, enterprises do not have to worry about data security issues. Fine-grained access control down to the column level can be achieved to ensure compliance with data access. Now, Xiao Ming finally smiled. With Hive Catalog, his daily work efficiency has improved significantly. Cross-system data analysis has become so simple, as smooth as operating within the same system. This is just the beginning. In the following sections, we will explore more powerful features of Hive Catalog. Let's take a look at the new chapter of Doris + Hive data lakehouse integration! Core Features of Doris-Hive Catalog Xiao Ming recently faced a new challenge. The company's data analysis scenarios are becoming increasingly complex, with both traditional HDFS storage and the introduction of object storage. How can Doris elegantly handle these different storage media? Let's delve into the powerful features of Doris Hive Catalog in a simple and understandable way. Diverse Storage Support Each storage system has its own strengths. HDFS + Hive is suitable for large-scale offline processing of historical full data, while object storage offers high scalability and low-cost advantages... But, Hive Catalog provides a unified access interface, shielding the differences of the underlying storage: SQL -- Connect to S3 CREATE CATALOG hive_s3 PROPERTIES ( "type"="hms", "hive.metastore.uris" = "thrift://172.0.0.1:9083", "s3.endpoint" = "s3.us-east-1.amazonaws.com", "s3.region" = "us-east-1", "s3.access_key" = "ak", "s3.secret_key" = "sk", "use_path_style" = "true" ); -- Optional properties: -- s3.connection.maximum: Maximum number of S3 connections, default 50 -- s3.connection.request.timeout: S3 request timeout, default 3000ms -- s3.connection.timeout: S3 connection timeout, default 1000ms -- Connect to OSS CREATE CATALOG hive_oss PROPERTIES ( "type"="hms", "hive.metastore.uris" = "thrift://172.0.0.1:9083", "oss.endpoint" = "oss.oss-cn-beijing.aliyuncs.com", "oss.access_key" = "ak", "oss.secret_key" = "sk" ); Intelligent Metadata Management Doris employs an intelligent metadata caching mechanism to provide high-performance queries while ensuring data consistency: Local Cache Policy Doris caches table metadata locally to reduce the frequency of access to HMS. When the cache exceeds the threshold, it uses the LRU (Least-Recent-Used) strategy to remove some caches. Smart Refresh [Notification Event Diagram] By subscribing to HMS's Notification Event, Doris can promptly detect metadata changes. For example, you can set the Catalog's timed refresh when creating the Catalog: SQL CREATE CATALOG hive PROPERTIES ( 'type'='hms', 'hive.metastore.uris' = 'thrift://172.0.0.1:9083', 'metadata_refresh_interval_sec' = '3600' ); You can also manually refresh as needed: SQL -- Refresh the specified Catalog. REFRESH CATALOG ctl1 PROPERTIES("invalid_cache" = "true"); -- Refresh the specified Database. REFRESH DATABASE [ctl.]db1 PROPERTIES("invalid_cache" = "true"); -- Refresh the specified Table. REFRESH TABLE [ctl.][db.]tbl1; Enterprise-Level Security Features Security is always a top priority in enterprise data management. Hive Catalog also provides a complete security solution: Ranger Permission Control Apache Ranger is a security framework for monitoring, enabling services, and comprehensive data access management on the Hadoop platform. Doris supports using Apache Ranger for authorization for a specified External Hive Catalog. Currently, it supports Ranger's authorization for databases, tables, and columns, but does not support encryption, row-level permissions, Data Mask, and other functions. You only need to configure the FE environment and add it when creating the Catalog: SQL -- access_controller.properties.ranger.service.name refers to the type of service -- For example, hive, hdfs, etc. It is not the value of ranger.plugin.hive.service.name in the configuration file. "access_controller.properties.ranger.service.name" = "hive", "access_controller.class" = "org.apache.doris.catalog.authorizer.ranger.hive.RangerHiveAccessControllerFactory", Kerberos Authentication In addition to integrating with Ranger, Doris Hive Catalog also supports seamless integration with the existing Kerberos authentication system in enterprises. For example: SQL CREATE CATALOG hive_krb PROPERTIES ( 'type'='hms', 'hive.metastore.uris' = 'thrift://172.0.0.1:9083', 'hive.metastore.sasl.enabled' = 'true', 'hive.metastore.kerberos.principal' = 'your-hms-principal', 'hadoop.security.authentication' = 'kerberos', 'hadoop.kerberos.keytab' = '/your-keytab-filepath/your.keytab', 'hadoop.kerberos.principal' = 'your-principal@YOUR.COM', 'yarn.resourcemanager.principal' = 'your-rm-principal' ); Xiao Ming can now flexibly choose storage methods and security modes according to different business needs, truly achieving unified management and efficient analysis of Doris + Hive data. The boundaries between data lakes and data warehouses are blurring, and Doris has built a bridge connecting the two worlds through Hive Catalog. With the continuous evolution of technology, we look forward to seeing more innovative application scenarios.
Infrastructure as code (IaC) automates antiquated manual configurations in the most critical digital landscapes within communities. As the urgency and role of data centers continue to evolve, IaC could become an expectation for building managers. What can software developers learn about the advantages and best practices for using IaC in delicate data center environments? We’re sharing more below. Key Components of Infrastructure as Code Quality IaC management includes several pivotal features. Controlling software is the central fixture. These give workers visibility over their configuration decisions in real time and inform change management if necessary. Infrastructure as code works alongside version control systems, which keep a log of configuration alterations. This promotes a collaborative work environment and holds developers accountable for script shifts. It also permits easy rollbacks if necessary, much like an undo button. Developers also leverage cloud services and application programming interfaces (APIs) to make these assets work harmoniously. These connections are proven to make configuration testing more repeatable in multicloud and hybrid setups. Benefits of Infrastructure as Code for Data Center Management Security experts, coders, and other data center stakeholders must acknowledge the pros of IaC. It maintains the desired state of servers, networks, and more with minimal intervention. Improved Efficiency and Speed Manual configurations through control panels were the standard way of overseeing network infrastructure. Today, IaC simplifies and automates countless to-do’s with templates and scripts. As data centers grow in complexity and size, the shift to code-driven management becomes more crucial. IaC also has faster deployment times compared to traditional configuration methods. Many of these frameworks operate through cloud servers, which have a rapid time-to-market. Resource provisioning of physical and digital components is labor-intensive, and IaC could alleviate burdens. Studies have shown a 90% reduction in these tasks for data center migrations, enhancing productivity. Consistency and Standardization Developers have two main options for controlling IaC: declarative and imperative. The declarative approach establishes coherency in data center resources because the coder informs the system of their preferred end state. Based on this information, the code designs network consistency. Imperative techniques list the steps more directly. Both enable standardization by phasing out concerns like configuration drift, which is common in facilities without IaC. Over time, patches and updates cause servers to deviate from their fundamentals. IaC prevents this from happening by making every device and environment mirror each other. Enhanced Collaboration Cloud providers and novel APIs facilitate enhanced communication between large DevOps teams in IaC deployments. It brings clarity to who made changes and when. Detecting performance compromises or triaging a cybersecurity threat becomes more straightforward. This is because developers can see how the system has reacted in real-time to updates and other interventions. The improved connectivity between teammates can also alleviate the negative impacts of labor shortages in the data center space. IaC can supplement and support strained workforces. Cost and Resource Savings Data centers are already expensive, demanding high budgets to enable uninterruptible energy distribution. They also have to handle modern technologies like widespread artificial intelligence (AI) usage. A query to ChatGPT requires 10 times the energy than a Google search, and data centers need to save as many resources as possible to anticipate these consumption trends. Removing manual configuring could open more resources to boost overall facility efficiency. Market reporting claims IaC could cut infrastructure costs by 30%, making it a competitive solution for companies. Tips for Implementing Infrastructure as Code Incorporating IaC can be motivating instead of daunting by using these best strategies during conversion. They include: Gradually scaling applications: Start with one application and slowly grow into more complex collaborative structures for tracking and customization.Using modular code: Make scripts reusable and adaptive to prevent frequent changes and duplication. Pair with continuous integration/continuous deployment (CI/CD): Automates code testing and merges redundant activities to eliminate bugs. However, these strategies will only make an impact in data centers if developers and stakeholders invest in training and education. Everyone must know the mandatory compliance frameworks they need to follow, including cloud-specific and privacy-related processes. They must also know how to report IaC activities in data centers to auditors and shareholders to validate its success. Making the Most of IaC in Data Centers Data center stakeholders have everything to gain by implementing IaC for machinery and digital environment configurations. They are customizable and automated, reducing workloads and making critical infrastructure safer. Developers and building managers must develop a plan to integrate IaC to handle the load demands data centers will have in the near future.
Typically, complexity in programming is managed by breaking down tasks into subtasks. These subtasks can then be executed concurrently. Since Java 5, ExecutorService API helps the programmer execute these subtasks concurrently. However, given the nature of concurrent execution, each subtask could fail or succeed independently with no implicit communication between them. The failure of one subtask does not automatically cancel the other subtasks. Although an attempt can be made to manage this cancellation manually via external handling, it's quite tricky to get it right — especially when a large number of subtasks are involved. This could potentially result in loose threads (alternatively known as thread leaks). Although virtual threads are a cost-effective way to dedicate a thread to each (sub)task, managing their results remains a challenge. Executor service allows one thread to create it and another thread to submit tasks to this executor service. The threads that perform the tasks bear no relation to both of these threads. Moreover, a completely different thread can await the result of execution from these subtasks via reference to its Future object — which is provided immediately upon task submission to the executor service. Thus, effectively, the subtask does not have to return to the task that submitted it. It could possibly return to multiple threads or none. Also, the relationship between a task and its subtask is only logical and not visible in the code structure. There is no enforcement or tracking of the task-subtask relationship in runtime. Structured concurrency in Java aims to solve all of the above challenges by: Reliably automating the cancellation of subtasks; avoiding thread leaks and delays.Ensuring a (sub)task return only to the thread that submitted it.Enforcing a structured relation between a task and its subtasks — which could be nested as well. Unstructured Concurrency Let's understand more about the current situation of unstructured concurrency in Java. Consider a function handle() which fetches user information and its associated order. Java Response handle() throws ExecutionException, InterruptedException { Future<String> user = esvc.submit(() -> findUser()); Future<Integer> order = esvc.submit(() -> fetchOrder()); String theUser = user.get(); // Join findUser int theOrder = order.get(); // Join fetchOrder return new Response(theUser, theOrder); } At first glance, the code seems simple and does what it intends to do. However, on closer look, we could identify multiple issues: If findUser() throws exception then the thread running fetchOrder() leaks as the latter has no information or knowledge about the former’s execution status. If thread running handle()is interrupted, then both the threads running findUser() and fetchOrder() leaks as these threads, too, have no knowledge of the thread that spawned them.If findUser()took too long, and meanwhile fetchOrder() fails, the failure would only be identified when order.get() is invoked.Although the code is structured as task related to its subtask, this relation is merely logical and is neither explicitly described at compile time nor enforced during runtime. The first three situations arise due to the lack of an automated mechanism for cancelling the other threads. This potentially leads to resource wastage (as threads continue to execute), cancellation delays, and, at worst, these leaked threads may interfere with other threads, too. Although we may attempt to handle the cancellation manually, not only is it tricky to get it right, but it complicates the overall program and creates more room for errors. The fourth situation arises due to a lack of formal syntax, which binds the threads into a parent-child relationship for their task to subtask hierarchy. The Future object provided are unhelpful too in this case. All these limitations are due to the unstructured nature of concurrency via ExecutorService and Future which lacks an automated way of canceling or tracking tasks and subtasking relationships. Structured Concurrency Structured concurrency principles that: If a task splits into concurrent subtasks then they all return to the same place, namely the task’s code block. In structured concurrency, the task awaits the subtasks’ results and monitors them for failures. APIs also define well-defined entry and exit points for the flow of execution through a block of code. APIs also help enforce a strict nesting of the lifetimes of operations in a way that mirrors their syntactic nesting in the code. Structured concurrency has a natural synergy with virtual threads. A new virtual thread can be dedicated to every task, and when a task fans out by submitting subtasks for concurrent execution, it can dedicate a new virtual thread to each subtask. Moreover, the task-subtask relationship has a tree structure for each virtual thread to carry a reference to its unique parent. While virtual threads deliver an abundance of threads, structured concurrency can correctly and robustly coordinate them. This also enables observability tools to display threads as they are understood by the developer. StructuredTaskScope In a structured concurrency API, StructuredTaskScope is the principal class. The earlier example of handle() function rewritten with StructuredTaskScope is shown below. Java Response handle() throws ExecutionException, InterruptedException { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Supplier<String> user = scope.fork(() -> findUser()); Supplier<Integer> order = scope.fork(() -> fetchOrder()); scope.join() // Join both sub-tasks .throwIfFailed(); // ... and propagate errors // Here, both sub-tasks have succeeded, so compose their results return new Response(user.get(), order.get()); } } With use of these APIs we achieve all of the below, which addresses the shortcomings of the unstructured concurrency discussed so far. On failure of either findUser() or fetchOrder() other is automatically cancelled; in case not completed yet.In case the thread running handle()is interrupted before or during invocation to join()both the subtask viz. findUser() or fetchOrder() are cancelled; in case not completed yet.The task structure and code mirror each other with abundant clarity. The scope is considered a task (parent) while the fork are subtasks (children). The task waits for subtasks to complete or be cancelled and decide to succeed or fail with no overhead of lifecycle management.An additional benefit earned via the hierarchy of task to subtask is major improvement in observability. The call stack or thread dump clearly displays the relationship between the handle() to findUser() and fetchOrder() which can easily be understood by the developer. With the automatic cancellation/cleanup achieved via ShutdownOnFailure policy, thread leaks are avoided altogether. This policy can be customized or replaced with other available policies. Below are a few of the important characteristics of StructuredTaskScope a developer should be aware of As of JDK 24, this is still a preview feature and is thus disabled by default.The thread that creates the scope is its owner.With every invocation of fork(…) new virtual thread is started, for execution of subtask, by default.A subtask can create its own nested StructuredTaskScope to fork its own subtasks, thus creating a hierarchy. Once the scope is closed, all of its subtasks are guaranteed to be terminated, ensuring that no threads are leaked.The scope owner or any of its (nested) subtasks can invoke shutdown()on scope. Thus initiating cancellation of other unfinished subtasks. Moreover, forking of new subtasks is prevented too.If shutdown() or fork()is invoked by thread which either is non-owner or not part of scope hierarchy, WrongThreadException is thrown.Calling either join() or joinUntil(Instant) within a scope is mandatory. If a scope's block exits before joining, then the scope will wait for all subtasks to terminate and then throw an exception.When join() completes successfully, then each of the subtasks has either completed successfully, failed, or been cancelled because the scope was shut down.StructuredTaskScope enforces structure and order upon concurrent operations. Thus, using a scope outside of a try-with-resources block and returning without calling close(), or without maintaining the proper nesting of close() calls, may cause the scope's methods to throw a StructureViolationException. Shutdown Policies To avoid any unnecessary processing during concurrent subtask execution, it is common to use short-circuiting patterns. In a structured concurrency shutdown, policies can be utilized for such requirements. Two subclasses of StructuredTaskScope, ShutdownOnFailure and ShutdownOnSuccess, support these patterns with policies that shut down the scope when the first subtask fails or succeeds, respectively. The earlier example already shows the usage of ShutdownOnFailure while below example exhibits usage of ShutdownOnSuccess which returns the result of the first successful subtask. A real-world example would be to query from multiple servers for a response and return results of the very first response from any of the servers (tail chopping). Java <T> T race(List<Callable<T>> tasks, Instant deadline) throws InterruptedException, ExecutionException, TimeoutException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) { for (var task : tasks) { scope.fork(task); } return scope.joinUntil(deadline) .result(); // Throws if none of the sub-tasks completed successfully } } Note that as soon as one subtask succeeds, this scope automatically shuts down, canceling unfinished subtasks. The task fails if all of the subtasks fail or if the given deadline elapses. While these two shutdown policies are provided out of the box, they can be customized as per requirements. Processing Results Shutdown policies additionally provide centralized methods for handling exceptions and, possibly, successful results. This is in line with the spirit of structured concurrency, according to which an entire scope is treated as a unit. The scope’s owner can process the results of the subtasks using the Subtask objects returned from the calls to fork(...) if they are not processed by the policy. If the shutdown policy itself processes subtask results — as in the case of ShutdownOnSuccess — then the Subtask objects returned by fork(...) should be avoided altogether, and the fork(...) method treated as if it returned void. Subtasks should return, as their result, any information that the scope owner should process after centralized exception handling by the policy. Structured Concurrency in Various Programming Languages While structured concurrency is still being previewed in Java, it is already available in various programming languages. Here is sneak peak for few of these languages. Kotlin Coroutines. Kotlin offers coroutines, which are a lightweight concurrency model that allows for asynchronous programming without blocking threads. Coroutines provide structured concurrency through scopes, ensuring that tasks are properly managed and canceled when necessary.Structured concurrency. Kotlin’s structured concurrency is built into its coroutine framework, making it easy to write concurrent code that is both efficient and easy to understand. Go Goroutines. Go uses goroutines, which are lightweight threads managed by the Go runtime. Goroutines can be easily created and managed, allowing for high concurrency.Channels. Go provides channels for communication between goroutines, enabling structured concurrency by ensuring that tasks can communicate and synchronize effectively. Python Asyncio. Python’s asyncio library provides support for asynchronous programming using coroutines. Asyncio allows for structured concurrency through the use of tasks and event loops, ensuring that tasks are properly managed and canceled when necessary.Task groups. Python’s asyncio library includes task groups, which provide a way to manage and cancel groups of tasks together, ensuring that tasks are properly coordinated. C# Async/Await. C# provides support for asynchronous programming using the async and await keywords. This allows for structured concurrency by ensuring that tasks are properly managed and cancelled when necessary.Task Parallel Library (TPL). The TPL provides support for parallel programming in C#, allowing for the creation and management of tasks in a structured manner. Conclusion In conclusion, structured concurrency in Java represents a significant evolution in concurrent programming, addressing many of the shortcomings found in traditional unstructured concurrency models. Key advantages of structured concurrency in Java include: Automated cancellation. Subtasks are automatically cancelled upon failure, reducing resource wastage and eliminating the complexity of manual cancellation handling.Clear task hierarchy. The hierarchical task structure enhances code readability, maintainability, and observability, making it easier to debug and monitor concurrent operations.Improved error handling. Centralized error handling through structured concurrency ensures predictable and robust behavior in the presence of exceptions.Enhanced observability. The clear parent-child relationships displayed in thread dumps and call stacks aid developers in understanding and managing concurrent tasks.Virtual threads. The synergy with virtual threads allows for efficient and scalable concurrent programming, making it possible to handle a large number of concurrent tasks without the overhead of traditional threads. By adopting structured concurrency, Java developers can write more efficient, reliable, and maintainable concurrent code, ultimately leading to better software quality and improved developer productivity. References and Further Reads Structured ConcurrencyOracle DocsJEP 499
Apache Doris provides multiple ways to import data, including StreamLoad, HdfsLoad (gradually replacing BrokerLoad), RoutineLoad, MySQLLoad, and others. StreamLoad is the most commonly used method, as many data synchronization tools like Flink, Spark, and DataX use it under the hood. Since StreamLoad is the underlying mechanism for Flink Doris Connector, Spark Doris Connector, and DataX, most data import issues tend to occur with StreamLoad. This article will discuss common import errors and their solutions. Several Common Data Import Errors 1. Partition Not Created Schema SQL CREATE TABLE IF NOT EXISTS tb_dynamic_partition_test2 ( `sid` LARGEINT NOT NULL COMMENT "Student ID", `name` VARCHAR(50) NOT NULL COMMENT "Student Name", `class` INT COMMENT "Class", `age` SMALLINT COMMENT "Age", `sex` TINYINT COMMENT "Gender", `phone` LARGEINT COMMENT "Phone", `address` VARCHAR(500) NOT NULL COMMENT "Address", `date` DATETIME NOT NULL COMMENT "Date of Data Entry" ) ENGINE=olap DUPLICATE KEY (`sid`, `name`) PARTITION BY RANGE(`date`)() DISTRIBUTED BY HASH (`sid`) BUCKETS 4 PROPERTIES ( "dynamic_partition.enable"="true", "dynamic_partition.start"="-3", "dynamic_partition.end"="1", "dynamic_partition.time_unit"="DAY", "dynamic_partition.prefix"="p_", "dynamic_partition.replication_num"="1", "dynamic_partition.buckets"="4" ); StreamLoad Command Asterisk curl --location-trusted -u root -H "column_separator:," -T /mnt/disk2/test.csv http://ip:8030/api/test/tb_dynamic_partition_test2/_stream_load Error Plain Text Reason: no partition for this tuple. tuple=+---------------+---------------+--------------------+--------------------+--------------------+------------------+-----------------+----------------+ Solution The error occurs when there is no partition for the data. To resolve this, create the missing partition: SQL -- Disable dynamic partition ALTER TABLE tb_dynamic_partition_test2 SET ("dynamic_partition.enable" = "false"); -- Add partition ALTER TABLE tb_dynamic_partition_test2 ADD PARTITION p_20240426 VALUES [("2024-04-26 00:00:00"), ("2024-04-27 00:00:00")) ("replication_num"="1"); -- Re-enable dynamic partition ALTER TABLE tb_dynamic_partition_test2 SET ("dynamic_partition.enable" = "true"); After adding the partition, data should import successfully. 2. Data and Column Type Mismatch Schema SQL CREATE TABLE IF NOT EXISTS test ( `sid` LARGEINT NOT NULL COMMENT "Student ID", `name` VARCHAR(5) NOT NULL COMMENT "Student Name", `class` INT COMMENT "Class", `age` SMALLINT COMMENT "Age", `sex` TINYINT COMMENT "Gender", `phone` LARGEINT COMMENT "Phone", `address` VARCHAR(5) NOT NULL COMMENT "Address", `date` DATETIME NOT NULL COMMENT "Date of Data Entry" ) ENGINE=olap DUPLICATE KEY (`sid`, `name`) DISTRIBUTED BY HASH (`sid`) BUCKETS 4 PROPERTIES ( "replication_num"="1" ); StreamLoad Command Shell curl --location-trusted -u root -H "column_separator:," -T /mnt/disk2/liyuanyuan/data/test.csv http://10.16.10.6:18739/api/test/test/_stream_load Data Plain Text 1, lisixxxxxxxxxxxxxxxxxxxx, 1001, 18, 1, 1008610010, bj, 2024-04-26 Error Plain Text Reason: column_name[name], the length of input is too long than schema. first 32 bytes of input str: [lisixxxxxxxxxxxxxxxxxxxx] schema length: 5; actual length: 24; Solution The name column's data length exceeds the schema definition. To fix this, increase the length of the name column. SQL ALTER TABLE test MODIFY COLUMN name VARCHAR(50); Data should now import successfully. 3. Mismatched Columns Between Data and Schema Schema SQL CREATE TABLE IF NOT EXISTS test2 ( `sid` LARGEINT NOT NULL COMMENT "Student ID", `name` VARCHAR(50) NOT NULL COMMENT "Student Name", `class` INT COMMENT "Class", `age` SMALLINT COMMENT "Age", `sex` TINYINT COMMENT "Gender", `phone` LARGEINT COMMENT "Phone", `address` VARCHAR(50) NOT NULL COMMENT "Address", `date` DATETIME NOT NULL COMMENT "Date of Data Entry" ) ENGINE=olap DUPLICATE KEY (`sid`, `name`) DISTRIBUTED BY HASH (`sid`) BUCKETS 4 PROPERTIES ( "replication_num"="1" ); Data Plain Text 1, xxxxxxxxxxxxxxxxxxxxxxx, 1001, 18, 1, 1008610010, beijing, 2024-04-26, test_column StreamLoad Command Shell curl --location-trusted -u root -H "column_separator:," -T /mnt/disk2/liyuanyuan/data/test2.csv http://10.16.10.6:18739/api/test/test2/_stream_load Error Plain Text Reason: actual column number in CSV file is more than schema column number. Actual number: 9, schema column number: 8. Solution To fix this, add the extra column to the schema: SQL ALTER TABLE test2 ADD COLUMN new_col VARCHAR(50); 4. Special Characters in CSV Causing Import Failure Special characters like commas within the data can cause issues during import, especially when columns contain delimiters. A good solution is to use the JSON format instead of CSV for such cases. If the spark or flink engine is used for import, set the following parameters. Solution Java properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); Alternatively, use the proper escape sequences for handling special characters in CSV files. Handling Special Characters in CSV Files 1. Data Containing Quotation Marks When dealing with CSV files where data is enclosed in quotation marks, it’s important to configure StreamLoad with the appropriate settings. Example Schema SQL CREATE TABLE IF NOT EXISTS test3 ( `sid` LARGEINT NOT NULL COMMENT "Student ID", `name` VARCHAR(50) NOT NULL COMMENT "Student Name", `class` INT COMMENT "Class", `age` SMALLINT COMMENT "Age", `sex` TINYINT COMMENT "Gender", `phone` LARGEINT COMMENT "Phone", `address` VARCHAR(50) NOT NULL COMMENT "Address", `date` DATETIME NOT NULL COMMENT "Date of Data Entry" ) ENGINE=olap DUPLICATE KEY (`sid`, `name`) DISTRIBUTED BY HASH (`sid`) BUCKETS 4 PROPERTIES ( "replication_num"="1" ); Data Plain Text "1","xxxxxxx","1001","18","1","1008610010","beijing","2024-04-26" StreamLoad Command Shell curl --location-trusted -u root -H "column_separator:," -H "enclose:\"" -H "trim_double_quotes:true" -T /path/to/test3.csv http://ip:8030/api/test/test3/_stream_load Solution Plain Text enclose: specifies a enclose character. trim_double_quotes: to true when cutting the CSV file for each field of the outermost double quotation marks. 2. Data Containing Partial Quotes Data Plain Text "1","xx,x,x,xxx",1001,18,"1",1008610010,"bei,jing",2024-04-26 StreamLoad Command Shell curl --location-trusted -u root -H "column_separator:," -H "enclose:\"" -H "trim_double_quotes:true" -T /mnt/disk2/liyuanyuan/data/test4.csv http://10.16.10.6:18739/api/test/test4/_stream_load Handling Windows Line Endings If data is imported from Windows (with \r\n line endings) causes issues where queries do not return expected results, check if the Windows line endings are present. Solution Use od -c to check for \r\n and specify the correct line delimiter during import: Shell -H "line_delimiter:\r\n" Using Expression in StreamLoad Example 1 SQL CREATE TABLE test_streamload ( user_id BIGINT NOT NULL COMMENT "User ID", name VARCHAR(20) COMMENT "User Name", age INT COMMENT "User Age" ) DUPLICATE KEY (user_id) DISTRIBUTED BY HASH (user_id) BUCKETS 10 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" ); StreamLoad Command Shell curl --location-trusted -u "root:" -T /path/to/data.csv -H "format:csv" -H "column_separator:," -H "columns:user_id,tmp,age,name=upper(tmp)" http://ip:8030/api/test/test_streamload/_stream_load Example 2 SQL CREATE TABLE test_streamload2 ( c1 INT, c2 INT, c3 VARCHAR(20) ) DUPLICATE KEY (c1) DISTRIBUTED BY HASH (c1) BUCKETS 10 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" ); StreamLoad Command Shell curl --location-trusted -u "root:" -T /path/to/data.csv -H "format:csv" -H "column_separator:," -H "columns:c1,c2,A,B,C,c3=CONCAT(A,B,C)" http://ip:8030/api/test/test_streamload2/_stream_load Conclusion By understanding and addressing these common data import errors, you can significantly reduce the time spent troubleshooting and ensure smoother data synchronization with Apache Doris.
Why Would One Even Construct an AI Meme Generator? Memes are literally the one thing on the internet that anyone can understand. Whether you want to take a jab at your friend or want to show how coding sometimes gives you brain freezes, memes will always come to your rescue. The issue? Manually doing everything takes ages. You need to source the right picture, come up with snarky lines, and then figure out how to stick everything together without making it look like something a 5-year-old put together. But now, there are tools such as OpenAI and DeepSeek. With these, you don’t just automate comedy; you also automate currently trending formats and allow users to create memes in a matter of seconds. Here is how we approached our tasks: To generate engaging captions from memes, we created a context-specific approach.We built a super simple and straightforward drag-and-drop design interface. Finding new ways to economize API expenses allowed us to stay within the budget.Allowing users to store their most liked memes and adding a text-to-image meme feature made it possible. Looking Back at My Favorite Tools Before diving into the nitty-gritty details of the code, let’s discuss the tech stack a bit. And, just as a side note, assuming that you’ll be constructing a house without knowing what tools you’ll use is impractical. React + TypeScript. For the smooth, responsive UI React had brought into the world, TypeScript enabled the Team to catch many bugs that would have previously occurred.OpenAI/DeepSeek APIs. As long as there was a budget, the rest was history, with Division-04 being capable of delivering incisive, funny captions at will using GPT-4 Turbo. When they were limited, DeepSeek saved the day.Fabric.js. This library helps to have images with text dragged around easily, rather than feeling like one is trying to wrestle with a piglet drenched in oil.Vercel. Deployment utopia. It was also great during peak times because edge caching softened the blow.Redis. Low barrier to entry for protecting against rate limit and API abuse enforcement. Step 1: Set Up Your Own AI Brain What is clear is that an AI copying phrases from the internet will not work for memes like an AI telling you the response is “that’s hilarious” can. Memes require an amalgam of attitude, phrasing, and some level of restraint. This brings us to the more fundamental problem of how you tell an AI to make jokes. Tweaking the prompts of the AI itself, of course. Here’s a snip of the code used to create the captions: JavaScript // src/services/aiService.ts type MemePrompt = { template: string; // e.g., "Distracted Soul" context: string; // e.g., "When your code works on the first try" }; const generateMemeCaption = async ({ template, context }: MemePrompt) => { const prompt = ` Generate a sarcastic meme caption for the "${template}" template about "${context}". Rules: - Use Gen-Z slang (e.g., "rizz", "sigma") - Max 12 words - Add emojis related to the context `; const response = await openai.chat.completions.create({ model: "gpt-4-turbo", messages: [{ role: "user", content: prompt }], temperature: 0.9, // Higher = riskier jokes max_tokens: 50, }); return stripEmojis(response.choices[0].message.content); // No NSFW stuff allowed }; Pro tip: For humor, keep it around 0.7 to 0.9, but make sure to always moderate the response through OpenAI’s moderation endpoint for safety reasons. Step 2: Constructing the Meme Canvas If you attempted to deal with the HTML5 Canvas APIs, you understand how not straightforward they are to deal with. Luckily, Fabric.js came to the savior. It gave us Photoshop-like controls directly inside React with the added bonus of drag-and-drop. Take a look at this simplified version of our canvas component: TypeScript // src/components/MemeCanvas.tsx import { FabricJSCanvas, useFabricJSEditor } from "fabricjs-react"; export default function MemeCanvas() { const { editor, onReady } = useFabricJSEditor(); const [textColor, setTextColor] = useState("#FFFFFF"); const addTextLayer = (text: string) => { editor?.addText(text, { fill: textColor, fontFamily: "Impact", fontSize: 40, stroke: "#000000", strokeWidth: 2, shadow: "rgba(0,0,0,0.5) 2px 2px 2px", }); }; return ( <> <button onClick={() => addTextLayer("Why React, why?!")}>Add Default Text</button> <input type="color" onChange={(e) => setTextColor(e.target.value)} /> <FabricJSCanvas className="canvas" onReady={onReady} /> </> ); } Perks of this: Frees the text layers to be dragged anywhere on the document.Add color with stroke and shadow using the advanced color picker.Double-click to edit text to streamline the editing process. Step 3: Rate Limitations Imagine this for a moment: You release your app, and all of a sudden, everybody wants to make memes. Sounds fun, right? Until the OpenAI bill shoots up more than the price of Bitcoin. To address this, we put in place sliding window rate limiting with Redis. This is how we did it on Vercel Edge Functions: JavaScript // src/app/api/generate-caption/route.ts import { Ratelimit } from "@upstash/ratelimit"; import { Redis } from "@upstash/redis"; const ratelimit = new Ratelimit({ redis: Redis.fromEnv(), limiter: Ratelimit.slidingWindow(15, "86400s"), // 15 requests/day per IP }); export async function POST(request: Request) { const ip = request.headers.get("x-forwarded-for") ?? "127.0.0.1"; const { success } = await ratelimit.limit(ip); if (!success) { return new Response("Slow down, meme lord! Daily limit reached.", { status: 429, }); } // Proceed with OpenAI call } Hack to Save Costs Cache popular prompts such as "Hotline Bling" and "When Pull Request gets approved."Use CloudFlare to cache generated images. AI-Generated Meme Images From DALL-E 3 Sometimes, we learn the hard way that selecting the perfect meme template is an impossible task. JavaScript // src/services/aiService.ts const generateCustomMemeImage = async (prompt: string) => { const response = await openai.images.generate({ model: "dall-e-3", prompt: ` A meme template about "${prompt}". Style: Flat vector, bold outlines, no text. Background: Solid pastel color. `, size: "1024x1024", quality: "hd", }); return response.data[0].url; }; Changing the Output Prompt: "Two developers in a dispute over Redux and Zustand frameworks."Final product: An argument between Redux and Zustand cartoon characters is illustrated by two metabolizing icons fighting against a purple background. Meme History Feature (Zustad + LocalStorage) In order to enable users to keep collages, we added a meme history feature with the help of Zustand. JavaScript // src/stores/memeHistory.ts import { create } from "zustand"; import { persist } from "zustand/middleware"; type Meme = { id: string; imageUrl: string; caption: string; timestamp: number; }; interface MemeHistoryState { memes: Meme[]; saveMeme: (meme: Omit<Meme, "id" | "timestamp">) => void; } export const useMemeHistory = create<MemeHistoryState>()( persist( (set, get) => ({ memes: [], saveMeme: (meme) => { const newMeme = { ...meme, id: crypto.randomUUID(), timestamp: Date.now(), }; set({ memes: [newMeme, ...get().memes].slice(0, 100) }); }, }), { name: "meme-history" } ) ); User Occupational Stream Let’s create a meme, and then we can click on save.The meme will be kept locally and will be presented in a grid format.The meme that has been saved can be reloaded in the editor by clicking on it. Closing Thoughts Building an AI meme generator helped deepen my understanding, not just of coding, but of how to handle certain unexpected scenarios. I learned the hard way the importance of preparation from implementing harsh limit rates to enduring Reddit traffic surges. So, give it a try, work from the bottom up while making changes based on the feedback received, and enjoy yourself in the process. Perhaps your app might become popular, making you the next meme millionaire.
Kai Wähner
Technology Evangelist,
Confluent
Salman Khan
Director Data Science,
Afiniti
Fawaz Ghali, PhD