Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

ETL On-Premises Oracle Data to Google BigQuery Using Google Cloud Dataflow

DZone's Guide to

ETL On-Premises Oracle Data to Google BigQuery Using Google Cloud Dataflow

Learn how to ingest on-premises Oracle data with Google Cloud Dataflow via JDBC using the Hybrid Data Pipeline On-Premises Connector.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Google Cloud Dataflow is a data processing service for both batch and real-time data streams. Dataflow allows you to build pipes to ingest data, then transform and process according to your needs before making that data available to analysis tools. DataDirect Hybrid Data Pipeline can be used to ingest both on-premises and cloud data with Google Cloud Dataflow.

The tutorial below shows you how to ingest on-premises Oracle data with Google Cloud Dataflow via JDBC using the Hybrid Data Pipeline On-Premises Connector. Google Cloud Dataflow uses Apache Beam to create the processing pipelines. Beam has both Java and Python SDK options. The tutorial below uses a Java project, but similar steps would apply with Apache Beam to read data from JDBC data sources including SQL Server, IBM DB2, Amazon Redshift, Salesforce, Hadoop Hive, and more.

Setting Up Progress DataDirect Hybrid Data Pipeline

  1. Install Hybrid Data Pipeline in your DMZ or in the cloud by following the below tutorials.
  2. To connect to on-premises databases, you need to install an on-premises agent on one of your servers behind the firewall that lets the Hybrid Data Pipeline Server communicate with the database.
  3. To install Hybrid Data Pipeline’s On-Premise Agent and configure it the cloud service where you installed Hybrid Data Pipeline Server, please follow the below tutorials.  
  4. Also install the Hybrid Data Pipeline’s JDBC driver, which can be found on the same download page of Hybrid Data Pipeline Server and On-Premise Connector. Follow this documentation to install the driver.
  5. Once you have everything setup, navigate to http://<server-address>:8080/d2c-ui or https://<server-address>:8443/d2c-ui to view the Hybrid Data Pipeline’s UI.
  6. Log in with the default credentials d2cadmin/d2cadmin.
  7. Once you have logged in, create a New DataSource by clicking the New Data Source button, as shown below.
  8. Google Cloud Data Flow
  9. You should now see a list of all data stores, as shown below. Choose Oracle as your data store.
  10. Google Cloud Data Flow

  11. In the Configuration, fill out all the connection parameters that you would generally use to connect to your Oracle database and set the Connector ID. The Connector ID is the ID of the on-premises connector that you installed for this server and account in Step 3. If you have installed and configured the on-premise connector, you should automatically see the Connector ID in the dropdown.
  12. Google Cloud Data Flow
  13. Now, click on Test Connection, and you should now be able to connect to your Oracle database on-premise. Click the UPDATE button to save the configuration.

Setting Up Google Cloud Dataflow SDK and Project

  1. Complete the steps in the "Before you begin" section in this quick start from Google.
  2. To create a new project in Eclipse, go to FileNew > Project.
  3. In the Google Cloud Platform directory, select Google Cloud Dataflow Java Project.
  4. Google Cloud Data Flow
  5. Fill in Group ID and Artifact ID.
  6. Select Project template as Starter project with a simple pipeline from the dropdown.
  7. Select Dataflow version as 2.2.0 or above.
  8. Google Cloud Data Flow
  9. Click Next, and the project should be created.
  10. Add the JDBC IO library for Apache Beam from Maven and the DataDirect Hybrid Data Pipeline JDBC driver to the build path. You can find the Hybrid Data Pipeline JDBC driver in the install path.

Creating the Pipeline

  1. In this tutorial, the main goal will be to connect to an on-premises Oracle database, read the data, apply a simple transformation, and write it to BigQuery. The code for this project has been uploaded to GitHub for your reference.
  2. Open the StarterPipeline.java file and clear all the code in the main function.
  3. The first thing you need to do is create the pipeline. To create the pipeline:
    Pipeline p = Pipeline.create(         
    		 PipelineOptionsFactory.fromArgs(args).withValidation().create());
  4. This will be using input arguments to the program to configure the Pipeline.
  5. Connect to Oracle and read the data using  the JdbcIO.read() method to a PCollection, as shown below.
    PCollection<List<String>> rows = p.apply(JdbcIO.<List<String>>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                "com.ddtek.jdbc.ddhybrid.DDHybridDriver", "jdbc:datadirect:ddhybrid://<;Hybrid-Data-Pipline-Server-Address>;hybridDataPipelineDataSource=<Data Source Name>")
                .withUsername("Username")
                .withPassword("Password"))
                .withQuery("SELECT * FROM TEST01.CUSTOMER")
                .withCoder(ListCoder.of(StringUtf8Coder.of()))
                .withRowMapper(new JdbcIO.RowMapper<List<String>>() {
                    public List<String> mapRow(ResultSet resultSet) throws Exception {
                         
                        List<String> addRow = new ArrayList<String>();
                        //Get the Schema for BigQuery
                        if(schema == null)
                        {
                             schema = getSchemaFromResultSet(resultSet);
                        }
                         
                        //Creating a List of Strings for each Record that comes back from JDBC Driver.
                        for(int i=1; i<= resultSet.getMetaData().getColumnCount(); i++ )
                        {
                            addRow.add(i-1, String.valueOf(resultSet.getObject(i)));
                        }
                         
                        //LOG.info(String.join(",", addRow));
                         
                        return addRow;
                    }
                })
                )
  6. Once you have the data in PCollection, apply the transformation and hash the email address in the data as shown below by using ParDo to iterate through all the items in PCollection.
    .apply(ParDo.of(new DoFn<List<String>, List<String>>() {
                       @ProcessElement
                       //Apply Transformation - Mask the EmailAddresses by Hashing the value
                       public void processElement(ProcessContext c) {
                            
                           List<String> record = c.element();
          
                           List<String> record_copy = new ArrayList(record);
                           String hashedEmail = hashemail(record.get(11));
                           record_copy.set(11, hashedEmail);
                            
                           c.output(record_copy);
                            
                       }
                   }));
  7. Then, convert the PCollection, which has each row in a list format, to the TableRow object of the BigQuery model.
    PCollection<TableRow> tableRows =  rows.apply(ParDo.of(new DoFn<List<String>, TableRow>() {
             @ProcessElement
    //Convert the rows to TableRows of BigQuery
             public void processElement(ProcessContext c) {
                  
                 TableRow tableRow = new TableRow();
                 List<TableFieldSchema> columnNames = schema.getFields();
                 List<String> rowValues = c.element();
                 for(int i =0; i< columnNames.size(); i++)
                 {
                     tableRow.put(columnNames.get(i).get("name").toString(), rowValues.get(i));
                 }
     
                 c.output(tableRow);
             }
         }));
  8. Finally, write the data to BigQuery using the BigQueryIO.writeTableRows() method, as shown below.
    //Write Table Rows to BigQuery         
         tableRows.apply(BigQueryIO.writeTableRows()
                 .withSchema(schema)
                 .to("nodal-time-161120:Oracle.CUSTOMER")
                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Note: Before you run the pipeline, go to the BigQuery console and create a table with the same schema as your Oracle Table.

You can find all the code for this project in GitHub for your reference.

Running the Pipeline

  1. Go to Run > Run Configurations. Under Pipeline Arguments, you should see two different options to run the pipeline.  
    • DirectRunner: Runs the pipeline locally
    • DataFlowRunner: Runs the pipeline on Google Cloud DataFlow  
  2. To run locally, set the Runner to DirectRunner and run it. Once the pipeline has finished running, you should see your Oracle data in Google BigQuery.
  3. To run the pipeline on Google Cloud Dataflow, set the Runner to DataFlowRunner and make sure that you choose your account, project ID, and a staging location, as shown below.
  4. Google Cloud Data Flow
  5. Under Arguments > Program Arguments, set the path to tempLocation for BigQuery Write to store temporary files, as shown below.
  6. Google Cloud Data Flow
  7. Click on Run and you should now see a new job in Google Dataflow console starting. You can track the progress by clicking on the job and you should see a flowchart to show the status of each stage.
  8. Google Cloud Data Flow
  9. Once the pipeline has run successfully, you can go to the Google BigQuery console and run a query on the table to see all your data.
  10. Google Cloud Data Flow

We hope that this tutorial helped you to get started with how you can ETL on-premises Oracle data into Google BigQuery using Google Cloud Dataflow. You can use a similar process with any of the Hybrid Data Pipeline’s supported data sources.  

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,ingesting ,bigquery ,google cloud dataflow ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}