DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Automating Data
  • Optimizing Data Storage With Hybrid Partitioned Tables in Oracle 19c
  • Delta Live Tables in Databricks: A Guide to Smarter, Faster Data Pipelines
  • Implementing OneLake With Medallion Architecture in Microsoft Fabric

Trending

  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  • 5 Subtle Indicators Your Development Environment Is Under Siege
  • Understanding IEEE 802.11(Wi-Fi) Encryption and Authentication: Write Your Own Custom Packet Sniffer
  • Designing a Java Connector for Software Integrations
  1. DZone
  2. Data Engineering
  3. Data
  4. Data Pipeline Techniques in Action

Data Pipeline Techniques in Action

Take a deep dive into the architectural concepts of data pipelines along with a hands-on tutorial for implementation, demonstrating the concepts in action.

By 
Sohil Shah user avatar
Sohil Shah
·
Aug. 21, 24 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
6.5K Views

Join the DZone community and get the full member experience.

Join For Free

The topics covered are:

  • Data pipeline architecture
  • High-scale data ingestion
  • Data transformation and processing
  • Data storage
  • Staging data delivery
  • Operational data
  • Hands-on exercise

Data governance, orchestration, and monitoring

Image source

Data Pipeline Architecture

A data pipeline architecture consists of components and systems that provide services at every layer of the end-to-end data flow. The phases are data ingestion, data transformation and processing, data storage, data delivery, and data consumption for applications and operational data stores.

The data flow starts with an arbitrary data source which can be a database, flat files, enterprise applications, real-time streaming data, etc.

High Scale Data Ingestion

Data ingestion is the process of acquiring, mapping, and validating the received data and moving the data to the data processing phase. Architecture facets associated with the data ingestion process for the modern data pipeline architecture are as follows:

  • Data mapping: This is the process of mapping the data structure to the pipeline format. Map the data using configuration or flatten the data on the fly. A schema-agnostic approach is elastic. Any format such as JSON, XML, or CSV can be flattened to an unstructured format that the pipeline processes.
  • Data elasticity: Unflatten the unstructured pipeline data into a structured format for analysis.
  • High-scale ingestion: Ability to handle the volume and velocity of data being generated
  • Batch-based ingestion: Pulls data from the source on a scheduled basis
  • On-time ingestion: Receives data as a real-time stream
  • Ordering: A data event is ordered based on the timestamp of the receipt.
  • Data consistency: No duplicates and no data loss; same data event but still one copy; once received saved in a log

Data Transformation and Processing

This phase results in the cleaning and transformation of data for the downstream system, followed by processing for the next data storage phase.

Architecture facets associated with the data transformation/processing phase for the modern data pipeline architecture:

  • Cleaning: Make the data relevant.
  • Transformation: Change values for relevance to the use case.
  • Processing at scale: Add many parallel data pipelines. Data should stay consistent.
  • Processing with accuracy: No duplicates on repetitive data events; the source dataset must match the destination dataset.

Data Storage

This phase persists the processed data into a storage system like a data warehouse or a datalake. The data stored in these systems can be used for many use cases such as analytics, training AI models, bridging data silos between enterprise applications, business intelligence, etc.

Architecture facets associated with the data storage process are as follows:

  • Storing data: Store data for future retrieval, and analysis, or any other use case.
  • Organizing data: Organizing data optimized for query performance
  • Security and compliance: Data lifecycle for retention and regulatory compliance if applicable

Staging Data Delivery

This phase is where data pipelines can differ in their implementations. Some may call the previous data storage phase the final destination. Some provide this phase to deliver processed data to individual storage systems used for operational systems. However, that results in two phases: staging the data followed by an automated process or a manual process to make the data operational. They also differ in what storage systems they support out of the box. These systems are standard systems like relational databases, NoSQL, real-time search systems, etc.

Architecture facets associated with the staging data delivery phase include:

  • Parallel, or serial: Data processing can be parallel, as one route stores data to a datalake or data warehouse, and another delivers data for staging. Parallel is efficient and faster.
  • Data connector support: A data pipeline implementation will provide support for standard storage systems like MongoDB, MySQL, ElasticSearch, Snowflake, etc.
  • Data connector framework: It may also provide a data connector framework to connect to custom storage systems. 

Supporting this phase with a data pipeline has the following advantages:

  • Enterprise data silos: Connect multiple storage systems within your enterprise to keep them in sync in real-time with data consistency.
  • Supporting external data partners: Deliver data to multiple external storage systems.

Operational Data Phase

This is also a phase where data pipelines can differ in their implementation. This is the phase that processes staged data in operational systems and makes it live data.

Architecture facets associated with the operational data phase are:

  • Automated and real-time: A data pipeline can provide a data integration framework that can invoke custom logic to process staged data and make it live data as part of the data flow process.
  • Batch or manual: In this case, an organization can use a background process or manually process the staged data to make it live data.

Hand-On Exercise

This hands-on exercise will involve implementing an end-to-end data pipeline. The implementation will involve the data flow that will cover the following phases:

  • Data ingestion
  • Data transformation and processing
  • Data storage
  • Staging data delivery

It uses the open-source Braineous data platform (download here). Ignore the "Too big to load" message and click on the Download button.

  • This tutorial is located under braineous-1.0.0-cr3/tutorials/get-started.

Installation

Note that Braineous has the following service dependencies:

  • Zookeeper
  • Kafka
  • Flink
  • Hive MetaStore

The latest release, CR3, requires adjusting these services to your localhost setup for these services. The next release, CR4, will streamline the localhost service detection and a Kubernetes container out of the box.

Here are the localhost instructions for each service before running Braineous on localhost:

Zookeeper

Modify braineous-1.0.0-cr3/bin/start_zookeeper.sh script from:

Shell
 
~/mumma/braineous/infrastructure/kafka_2.13-3.5.0/bin/zookeeper-server-start.sh 
/Users/babyboy/mumma/braineous/infrastructure/kafka_2.13-3.5.0/config/zookeeper.properties


. . . to your local Zookeeper bin and config directories.

Kafka

Modify braineous-1.0.0-cr3/bin/start_kafka.sh script from:

Shell
 
~/mumma/braineous/infrastructure/kafka_2.13-3.5.0/bin/kafka-server-start.sh 
/Users/babyboy/mumma/braineous/infrastructure/kafka_2.13-3.5.0/config/server.properties


. . . to your local Kafka bin and config directories.

Flink

Modify braineous-1.0.0-cr3/bin/start_flink.sh script from:

Shell
 
~/mumma/braineous/infrastructure/flink-1.18.1/bin/start-cluster.sh


. . . to your local Flink bin directory.

Hive Metastore

Modify braineous-1.0.0-cr3/bin/start_metastore.sh script from:

Shell
 
export HADOOP_HOME=~/mumma/braineous/infrastructure/hadoop-3.3.6
export HIVE_HOME=~/mumma/braineous/infrastructure/apache-hive-3.1.3-bin
export HADOOP_CLASSPATH=$HIVE_HOME/conf:$HIVE_HOME/lib

~/mumma/braineous/infrastructure/apache-hive-3.1.3-bin/bin/hive --service metastore


. . . to your respective Hadoop/Hive local installations. In addition, change the following property in the local_hive_installation/conf/hive-site.xml file from my localhost file system location to yours.

XML
 
<property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/Users/babyboy/data_warehouse/hive</value>
    <description>location of default database for the warehouse</description>
  </property>
  <property>


Braineous

  • Modify braineous-1.0.0-cr3/conf/braineous.config:
Shell
 
#hive
hive_conf_directory=/Users/babyboy/mumma/braineous/infrastructure/apache-hive-3.1.3-bin/conf
table_directory=file:///Users/babyboy/datalake/


  • Modify the hive_conf_directory and table_directory properties to point to directories on your localhost filesystem.
  • CR4 will make the localhost out-of-the-box experience optimal, and also provide a Kubernetes Container.

Once your localhost services and configuration for Zookeeper, Kafka, Flink, and Hive Metastore are set up, follow the rest of the steps for the hands-on exercise to create the data pipeline.

Step 1: Install MongoDB

Shell
 
brew tap mongodb/brew


Shell
 
brew update


Shell
 
brew install mongodb-community@7.0


Step 2: Start Zookeeper

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./start_zookeper.sh


Step 3: Start Kafka

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./start_kafka.sh


Step 4: Start Flink

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./start_flink.sh


Step 5: Start Hive Metastore

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./start_metastore.sh


Step 6: Start Braineous

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./start_braineous.sh


Step 7: Check Braineous Installation Instance Success

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./test_installation.sh


Step 8: Create a Tenant with an API Key and Secret

Shell
 
cd braineous-1.0.0-cr3/bin


Shell
 
./pipemon.sh


Please use this API key and API secret for the tutorial. More instructions in the data ingestion section to follow.

Data Pipeline Registration

Register a data pipe with the data ingestion engine:

JSON
 
{
  "pipeId": "yyya",
  "entity": "abc",
  "configuration": [
    {
      "stagingStore" : "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
      "name": "yyya",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "yyya",
        "collection": "data",
        "jsonpathExpressions": []
      }
    }
  ]
}


For details about the pipe configuration please refer to the Developer Documentation Guide.

Source Data

JSON
 
[
  {
    "id" : 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]


A dataset can be loaded from any data source such as a database, legacy production data store, live data feed, third-party data source, Kafka stream, etc. In this example, the dataset is loaded from a classpath resource located at src/main/resources/dataset/data.json.

Data Ingestion

Data Ingestion Client Implementation

Java
 
   DataPlatformService dataPlatformService = DataPlatformService.getInstance();

    String apiKey = "ffb2969c-5182-454f-9a0b-f3f2fb0ebf75";
    String apiSecret = "5960253b-6645-41bf-b520-eede5754196e";

    String datasetLocation = "dataset/data.json";
    String json = Util.loadResource(datasetLocation);
    JsonElement datasetElement = JsonUtil.validateJson(json);
    System.out.println("*****DATA_SET******");
    JsonUtil.printStdOut(datasetElement);

    String configLocation = "pipe_config/pipe_config.json";
    String pipeConfigJson = Util.loadResource(configLocation);
    JsonObject configJson = JsonUtil.validateJson(pipeConfigJson).getAsJsonObject();
    String pipeId = configJson.get("pipeId").getAsString();
    String entity = configJson.get("entity").getAsString();
    System.out.println("*****PIPE_CONFIGURATION******");
    JsonUtil.printStdOut(configJson);

    //configure the DataPipeline Client
    Configuration configuration = new Configuration().
            ingestionHostUrl("http://localhost:8080/").
            apiKey(apiKey).
            apiSecret(apiSecret).
            streamSizeInObjects(0);
    dataPlatformService.configure(configuration);

    //register pipe
    dataPlatformService.registerPipe(configJson);
    System.out.println("*****PIPE_REGISTRATION_SUCCESS******");

    //send source data through the pipeline
    dataPlatformService.sendData(pipeId, entity,datasetElement.toString());
    System.out.println("*****DATA_INGESTION_SUCCESS******");


Please use the API key and API secret generated in Step 8 of the Installation section for the variables:

  • apiKey
  • apiSecret

Data Ingestion Client Output

Expected output from the ingestion client:

SystemVerilog
 
[INFO] ------------------------------------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
*****DATA_SET******
******ARRAY_SIZE: 2**********
[
  {
    "id": 1,
    "name": "name_1",
    "age": 46,
    "addr": {
      "email": "name_1@email.com",
      "phone": "123"
    }
  },
  {
    "id": "2",
    "name": "name_2",
    "age": 55,
    "addr": {
      "email": "name_2@email.com",
      "phone": "1234"
    }
  }
]
**********************
*****PIPE_CONFIGURATION******
{
  "pipeId": "yyya",
  "entity": "abc",
  "configuration": [
    {
      "stagingStore": "com.appgallabs.dataplatform.targetSystem.core.driver.MongoDBStagingStore",
      "name": "yyya",
      "config": {
        "connectionString": "mongodb://localhost:27017",
        "database": "yyya",
        "collection": "data",
        "jsonpathExpressions": []
      }
    }
  ]
}
**********************
*****PIPE_REGISTRATION_SUCCESS******
***SENDING_DATA_START*****
*****DATA_INGESTION_SUCCESS******


Data Pipeline Server Output

Expected output on the data pipeline server:

SystemVerilog
 
2024-08-05 19:06:09,071 INFO  [org.ehc.siz.imp.JvmInformation] (pool-7-thread-1) Detected JVM data model settings of: 64-Bit OpenJDK JVM with Compressed OOPs
2024-08-05 19:06:09,178 INFO  [org.ehc.siz.imp.AgentLoader] (pool-7-thread-1) Failed to attach to VM and load the agent: class java.io.IOException: Can not attach to current VM
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.ehcache.sizeof.ObjectGraphWalker (file:/Users/babyboy/mumma/braineous/data_platform/cr3/braineous_dataplatform/releases/braineous-1.0.0-cr3/bin/dataplatform-1.0.0-cr3-runner.jar) to field java.util.LinkedList.first
WARNING: Please consider reporting this to the maintainers of org.ehcache.sizeof.ObjectGraphWalker
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future releaseMONGODB: DATA_STORED_SUCCESSFULLY

2024-08-05 19:06:16,562 INFO  [org.apa.had.hiv.con.HiveConf] (pool-7-thread-1) Found configuration file null
2024-08-05 19:06:17,398 WARN  [org.apa.had.uti.NativeCodeLoader] (pool-7-thread-1) Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-08-05 19:06:17,492 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-7-thread-1) Setting hive conf dir as /Users/babyboy/mumma/braineous/infrastructure/apache-hive-3.1.3-bin/conf
2024-08-05 19:06:17,791 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-7-thread-1) Created HiveCatalog 'ffbaaaacaaaaaaaaafaaaabafafafbaebfaa'
2024-08-05 19:06:17,871 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-7-thread-1) Trying to connect to metastore with URI thrift://0.0.0.0:9083
2024-08-05 19:06:17,900 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-7-thread-1) Opened a connection to metastore, current connections: 1
2024-08-05 19:06:18,639 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-7-thread-1) Connected to metastore.
2024-08-05 19:06:18,641 INFO  [org.apa.had.hiv.met.RetryingMetaStoreClient] (pool-7-thread-1) RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=babyboy (auth:SIMPLE) retries=1 delay=1 lifetime=0
2024-08-05 19:06:18,907 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-7-thread-1) Connected to Hive metastore
2024-08-05 19:06:19,725 INFO  [org.apa.fli.tab.cat.CatalogManager] (pool-7-thread-1) Set the current default catalog as [ffbaaaacaaaaaaaaafaaaabafafafbaebfaa] and the current default database as [default].
2024-08-05 19:06:21,351 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-10-thread-1) Setting hive conf dir as /Users/babyboy/mumma/braineous/infrastructure/apache-hive-3.1.3-bin/conf
2024-08-05 19:06:21,381 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-10-thread-1) Created HiveCatalog 'ffbaaaacaaaaaaaaafaaaabafafafbaebfaa'
2024-08-05 19:06:21,387 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-10-thread-1) Trying to connect to metastore with URI thrift://0.0.0.0:9083
2024-08-05 19:06:21,388 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-10-thread-1) Opened a connection to metastore, current connections: 2
2024-08-05 19:06:21,396 INFO  [org.apa.had.hiv.met.HiveMetaStoreClient] (pool-10-thread-1) Connected to metastore.
2024-08-05 19:06:21,397 INFO  [org.apa.had.hiv.met.RetryingMetaStoreClient] (pool-10-thread-1) RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=babyboy (auth:SIMPLE) retries=1 delay=1 lifetime=0
2024-08-05 19:06:21,399 INFO  [org.apa.fli.tab.cat.hiv.HiveCatalog] (pool-10-thread-1) Connected to Hive metastore
2024-08-05 19:06:21,459 INFO  [org.apa.fli.tab.cat.CatalogManager] (pool-10-thread-1) Set the current default catalog as [ffbaaaacaaaaaaaaafaaaabafafafbaebfaa] and the current default database as [default].
(
  `id` STRING,
  `name` STRING,
  `age` STRING,
  `addr.email` STRING,
  `addr.phone` STRING,
  `0` STRING
)
2024-08-05 19:06:21,863 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlCreateCatalog does not contain a setter for field catalogName
2024-08-05 19:06:21,864 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlCreateCatalog cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,866 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlCreateView does not contain a setter for field viewName
2024-08-05 19:06:21,866 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlCreateView cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,868 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlAlterViewRename does not contain a getter for field newViewIdentifier
2024-08-05 19:06:21,868 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlAlterViewRename does not contain a setter for field newViewIdentifier
2024-08-05 19:06:21,868 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlAlterViewRename cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,871 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlAlterViewProperties does not contain a setter for field propertyList
2024-08-05 19:06:21,871 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlAlterViewProperties cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,873 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlAlterViewAs does not contain a setter for field newQuery
2024-08-05 19:06:21,873 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlAlterViewAs cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,874 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlAddPartitions does not contain a setter for field ifPartitionNotExists
2024-08-05 19:06:21,874 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlAddPartitions cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,875 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlDropPartitions does not contain a setter for field ifExists
2024-08-05 19:06:21,875 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlDropPartitions cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,876 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dql.SqlShowPartitions does not contain a getter for field tableIdentifier
2024-08-05 19:06:21,877 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dql.SqlShowPartitions does not contain a setter for field tableIdentifier
2024-08-05 19:06:21,877 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.dql.SqlShowPartitions cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,878 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dml.SqlTruncateTable does not contain a getter for field tableNameIdentifier
2024-08-05 19:06:21,878 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dml.SqlTruncateTable does not contain a setter for field tableNameIdentifier
2024-08-05 19:06:21,879 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.dml.SqlTruncateTable cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,879 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dql.SqlShowFunctions does not contain a setter for field requireUser
2024-08-05 19:06:21,879 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.dql.SqlShowFunctions cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,882 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dql.SqlShowProcedures does not contain a getter for field databaseName
2024-08-05 19:06:21,882 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.dql.SqlShowProcedures does not contain a setter for field databaseName
2024-08-05 19:06:21,885 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.dql.SqlShowProcedures cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:21,890 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) class org.apache.flink.sql.parser.ddl.SqlReplaceTableAs does not contain a setter for field tableName
2024-08-05 19:06:21,893 INFO  [org.apa.fli.api.jav.typ.TypeExtractor] (pool-10-thread-1) Class class org.apache.flink.sql.parser.ddl.SqlReplaceTableAs cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
2024-08-05 19:06:25,192 INFO  [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-1) Submitting job 'insert-into_ffbaaaacaaaaaaaaafaaaabafafafbaebfaa.yyya.abc' (5854f94a78a2ef2e79b264b97db11592).
2024-08-05 19:06:41,151 INFO  [org.apa.fli.cli.pro.res.RestClusterClient] (Flink-RestClusterClient-IO-thread-4) Successfully submitted job 'insert-into_ffbaaaacaaaaaaaaafaaaabafafafbaebfaa.yyya.abc' (5854f94a78a2ef2e79b264b97db11592) to 'http://127.0.0.1:8081'.


Data Verification

To verify the success of the ingestion and delivery to the configured target databases, use the following MongoDB commands.

Expected Result

You should see two records added to a collection called "data" in a database called "yyya" corresponding to configured value configuration.config.database.

Shell
 
 mongosh


Shell
 
use yyya


Shell
 
show collections


Shell
 
db.data.find()


Shell
 
db.data.count()


Expected Output

Shell
 
yyya> db.data.find()
[
  {
    _id: ObjectId("66b924b40c964128eb12400a"),
    id: 1,
    name: 'name_1',
    age: 46,
    addr: { email: 'name_1@email.com', phone: '123' }
  },
  {
    _id: ObjectId("66b924b40c964128eb12400b"),
    id: '2',
    name: 'name_2',
    age: 55,
    addr: { email: 'name_2@email.com', phone: '1234' }
  }
]
yyya> 


Conclusion

In conclusion, the modern data pipeline architecture provides high-scale data ingestion, data elasticity, ordering, and data consistency. A data pipeline architecture can modernize an organization's data needs for driving insights, analytics, ETL/ELT use cases, and machine learning infrastructure, among other use cases in this digital age.

Data storage Data transformation Data (computing) shell data pipeline

Opinions expressed by DZone contributors are their own.

Related

  • Automating Data
  • Optimizing Data Storage With Hybrid Partitioned Tables in Oracle 19c
  • Delta Live Tables in Databricks: A Guide to Smarter, Faster Data Pipelines
  • Implementing OneLake With Medallion Architecture in Microsoft Fabric

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!