Arm Twisting Apache NiFi
Arm Twisting Apache NiFi
Showcasing the flexibility of Apache NiFi.
Join the DZone community and get the full member experience.Join For Free
Apache NiFi, is a software project from Apache Software Foundation, designed to automate the flow of data between software systems.
Early this year, I created a generic, meta-data driven data offloading framework using Talend. While championing that tool, many accounts raised concerns regarding the Talend license. While some were apprehensive of the additional cost, many others questioned the tool itself, due to the fact that their account already had licenses for other competitive ETL tools like DataStage and Informatica (to name a few). A few accounts also wanted to know if the same concept of offloading could be made available using NiFi. Therefore, it was most logical to explore NiFi.
Without going into a detailed comparison between every ETL tool and NiFi, which would be a topic by itself, I would like to highlight one particular behavioral difference in the way the lifecycle of a workflow and its associated components is maintained and managed. In ETL, workflows have a definite start and a definite stop. In comparison, in NiFi, processors and components are always in running state. If the processor does not have sufficient data for it to perform its task, the processor continues to remain in a running state.
When the processor gets sufficient data, it processes the data (I'll refer to this state as 'executing'). Once the processing is over, the processor goes back to being idle. In other words, in an ETL workflow, a component is always inactive, until such time it gets control for execution. Only then does its state change to running, where it performs its action. Once the action is complete, control passes to the next component in the workflow. In comaprison, in NiFi, all processors are always in a running state, but most are idle, waiting to receive sufficient data for execution. By design, in an ETL workflow, only one processor or component can be in a running state and have execution control. Parallel execution needs to be added explicitly. In the case of NiFi (by design), all processors are executing in parallel, which in turn makes it difficult to exactly replicate an ETL workflow, but that is a topic for another day.
Now, coming to the main point of the article. In the Talend-based offloading framework, as an example, I had a job type that allowed me to download the content of an RDBMS table to HDFS. Given the ETL design philosophy, each workflow had a definite start and a definite end. To kick off the offload operation, I needed to provide connection details, database details, the name of the table, and destination details. Then, the workflow would take these parameters, make a connection, download the data to HDFS, and terminate the operation after all the data was transferred. The workflow was made up of a couple of connected components. For example, the workflow had a MySQL connector component, an HDFS component, and a couple of other components to download the data and upload it to HDFS. On execution, control passed through each component one at a time, where the component performed its action and control passed to the next component.
When I wanted to implement the offloading framework using NiFi, I created a workflow using the same logic as was present in the Talend workflow. After the initial hiccups of getting accustomed to the NiFI user interface and the method of creating and configuring the workflow, I had a workflow that would work (at least that is what I thought). When I activated all the processors in the workflow, I found that the workflow did perform as expected and downloaded data from the source table (from MySQL) to the destination directory (in HDFS). After some time, I noted that the destination directory was getting full and files were getting added.
Though I stopped all the processors, it took me some time to figure out the reason. Each processor in NiFi has a scheduled execution interval. By default, this interval is set to one second. As result, immediately after a processor finishes execution, it stops for one second and then starts executing again. This behavior of processors executing continuously was quite novel, but it was against the design. This is because, in an offloading scenario, we want to fetch a data set from the source only once. Obviously, the solution to this problem was to try and stop the first processor in the workflow. This resulted in the other processors in the workflow remained active and would not execute, as they would not get sufficient data to execute their action.
In the process of finding a solution, I came across NiFi's REST API. While the REST API does not have a direct method for what I wanted to do, it provided all the tools to allow me to do what I wanted.
The solution allowed me to covert the "always executing workflow and processors" of NiFi into a workflow that behaved quite similarly to a workflow in an ETL tool. Whether this is the correct approach is open to debate. Whether we should force-fit a powerful tool like NiFi to perform the task of data offloading is also open to debate. The NiFi REST API is quite powerful and allows us to control many aspects of NiFi — from managing a processor group, to managing individual processors, the queues that connect the processors.
To achieve what I wanted, I created a workflow but added a special processor as the last element in the workflow. Using this processor, I sent a JSON message/configuration to the first processor in the workflow. The message essentially changed the state of the processor from running to stopped. Initially, all processors are in a running state. The first processor connects to the metadata table and fetches information of the table to be offloaded. The workflow then connects to the source database and starts the offloading activity. Once the offloading is complete, control passes to the key processor.
This processor sends a message to the first processor and stops it, effectively stopping further execution of the workflow. By stopping the first processor, we no longer perform the action of fetching the name of the table from the metadata. Due to this, all other processors, which depend on inputs from the previous processors, do not have sufficient information to continue execution and effectively remain in an idle state. To update the state of a workflow, we need to create a configuration string in JSON format and send it to the concerned processor.
In this article, I have described how I arm-twisted an Apache NiFi workflow to get it to do what I wanted — work like a more traditional ETL workflow. I agree that twisting the usage of NiFI REST API in this way defeats the purpose of using NiFi and utilizes its power, but this article is a showcase for the flexibility of NiFi.
In following articles, I will share my experience of creating a collection of Python scripts to automate some of NiFi's workflow management tasks.
Opinions expressed by DZone contributors are their own.