Apache NiFi is an open-source project that was built for data flow automation and management between different systems. Some of the key features of NiFi, in addition to data flow, are ease of use with a drag-and-drop UI and easily scalable to run a single server or in a clustered mode across many servers. Interesting trivia about Apache Nifi is that it actually has its roots in the NSA and was donated to the Apache Software Foundation in late 2014.
To get you started with Apache NiFi, we will walk you through how to ingest Salesforce data into Hive tables with the help of Progress DataDirect JDBC drivers.
This tutorial assumes that you have Apache NiFi and a DataDirect JDBC driver. If you do not have those items, please follow these quick steps:
Download Apache NiFi
- Download Apache NiFi from here.
- Extract the contents of the package to your desired location.
Install Progress DataDirect Salesforce JDBC driver
- Download DataDirect Salesforce JDBC driver from here.
- To install the driver, execute the JAR package by running the following command in terminal or just by double clicking on the jar package:
java -jar PROGRESS_DATADIRECT_JDBC_SF_ALL.jar.
- This will launch an interactive Java installer to install the Salesforce JDBC driver to your desired location as either a licensed or evaluation installation.
- Note that this will install Salesforce JDBC driver and other drivers for your trial purposes in the same folder.
Add Drivers to the NiFi Classpath
- Go to the install location of Progress DataDirect drivers, which is usually at
- Copy the Salesforce JDBC driver (
sforce.jar) and Hive JDBC driver (
hive.jar) from the lib folder in the above location to
- If you have already started NiFi at this point, you will have to restart it to be able to use the drivers from NiFi.
Set Up the Salesforce JDBC Driver for the NiFi Flow
- Run the NiFi if you haven’t started it yet. You can do this by running
bin/nifi.shrun command on your terminal. Note that your password must be the root directory of NiFi when you run this command.
- Open your browser and go to
http://localhost:8080/nifito access the NiFi UI, where you can create the flow. You should be seeing a canvas as shown below when the page loads.
- To start with building the flow, let’s start with configuring the JDBC drivers first. On your canvas, there are two side bars, one says Navigate and another says Operate. Under the Operate open the settings for the process group named NiFi Flow as shown below.
- You should now see a pop with two tabs named General and Controller Services. Go to the Controller Services tab and click on the + button to add a new controller service.
- You should now see a new popup displaying various Controller services you can choose from. Choose DBCPConnectionPool as your controller service and click on Add button as shown below.
- Click on Edit button of the newly created controller service and you should now see a screen as shown below
- Fill in the details to connect to Salesforce as below.
- Go to Settings tab and rename the controller service if you want to. I changed mine to SalesforceConnect and click on Apply button to save the settings.
- Now it’s time to enable this controller service. On the Controller Service, you should find an Enable button between edit and delete as shown below. Click on it to enable the service.
Set Up the Hive JDBC Driver for the NiFi Flow
Create another controller service for configuring the Hive connection. Repeat steps 4 through 9 above, except in step 7, substitute the details for the Hive driver as follows:
Once you have both the Salesforce and Hive drivers configured, this is how the controllers will look after they have been enabled:
Close the NiFi Flow Configuration popup and you should now be back on the canvas. Let’s build the flow now that we have everything in place.
Build the NiFi Flow
To build the NiFi Flow, first we would have to read the data from Salesforce and it can be done using a processor called QueryDatabaseTable, which supports incremental pulling of the data.
Drag a processor on to the canvas and choose QueryDatabaseTable and add it onto the canvas as shown below.
Right click on the QueryDatabaseTable processor and choose to Configure. Go to the tab Scheduling and choose the what kind of Scheduling strategy you would like to have and set the schedule when this flow should run.
Go to Properties tab and under it configure it as follows:
- Database connection pooling service: Choose the controller service where you configured Salesforce Connection in previous steps. In my case, I chose the controller SalesforceConnect where I configured the connection to Salesforce using the Progress DataDirect JDBC driver.
- Table name: Choose the table that you would like to ingest incrementally. I choose to sync the table Opportunity.
Below is a screenshot of the final settings for your reference:
Click on Apply to save the processor configuration.
Drag another processor from the menu and choose SplitAvro as your processor from the list. This will split the avro binary output that comes out of the QueryDatabaseTable processor if it’s too big.
Connect the processor QueryDatabaseTable to SplitAvro. As soon as you connect both the processors you will see a configuration pop up for that connection show up. There is no configuration needed here; just click on Add.
You should now have a flow as shown below. Notice that QueryDatabaseTable processor no longer has the warning, which means it is properly configured. There are still warning signs on SplitAvro because we didn’t yet configure it.
Right-click on the SplitAvro processor and choose Configure. On the settings tab, choose to automatically terminate the relationships for failure in processor and for the original as we don’t need the original avro binary output as shown below. We will be using the only split relationship to go ahead.
These are the only settings that you need to take care of here. You can control how many records each split data file can contain from the properties tab, but I wouldn’t be going in to those today as I will be using default values for this tutorial. You can click on Apply and it will save the configuration for SplitAvro.
Drag another processor from the menu and choose ConvertAvrotoJSON to be your processor. This processor will convert all the avro records to JSON format.
Connect SplitAvro and ConvertAvrotoJSON processors and you should now see the configuration for connection between these two processors. Under for relationships, as we only need the split, choose it and click on Add as shown below.
Notice that all the warnings for the processor are cleared. You should now see warnings only for the ConvertAvrotoJSON processor
Right-click ConvertAvrotoJSON processor and choose Configure.
Under the Settings tab, choose to automatically terminate relationships for failure as shown below. Click on Apply to save the configuration.
Drag another processor from the menu and choose ConvertJSONtoSQL processor. This processor will help in creating Insert statements for the destination table.
Connect ConvertAvrotoJSON and ConvertJSONtoSQL processor and you should see a popup for connection configuration. Enable the connection for the relationship success and click on Add as shown below.
Notice that all the warnings for the processor ConvertAvrotoJSON are cleared. You should now see warnings only for the ConvertJSONtoSQL processor.
Configure the ConvertJSONtoSQL Processor
Before going ahead, if you don’t have an opportunity table with the same schema as Salesforce, you would have to create it first. I have tried to find if there was an automated way of achieving this, but I couldn’t find any. If you find a way to do this let me know in comments.
Right click on ConvertJSONtoSQL and choose to configure. On the Settings tab, choose to automatically terminate relationships on failure and original as shown below.
Go to the Properties tab and fill in the details as follows.
JDBC Connection Pool: Choose the controller service where you configured Hive Connection in previous steps. In my case, I chose the controller HiveConnect where I configured the connection to Hive using the Progress DataDirect JDBC driver.
Statement type: Insert.
Table name: The table where you want to insert the data in Hive.
The last processor that we need is the PutSQL processor that puts the Insert statements created in the processor ConvertJSONtoSQL. Connect the ConvertJSONtoSQL processor to PutSQL processor.
You now see the pop up for the connection configuration and configure it as shown below and click on Add.
Rightclick on the PutSQL processor and click Configure. Under the settings, automatically terminate relationships for failure, retry, and success.
Under the Properties tab, configure the connection to Hive as follows. Here, I am using the controller service that I have created for Hive in previous steps.
Click Apply to save the configuration. Your flow should be looking as shown below and you should see no warnings for any of the processors.
Run the Flow
To start the flow, starting with the QueryDatabaseTable, right-click every processor and start them. Once the processors are started, you can see the data flow from Salesforce to Hive. Once the PutSQL processor has completed the process, you should now be able to find the data in your Hive instance.
If you have scheduled the QueryDatabaseTable to run after X seconds, then after X seconds, the flow will fetch incremental data pull from Salesforce and ingest it into Hive automatically.
We hope this tutorial has helped you to get started with Apache NiFi. Note that you can use similar flows to ingest your Eloqua, Google Analytics, Sugar CRM, SQL Server, and Oracle data to Hive instance using Progress DataDirect JDBC drivers.