Gentle Introduction to Apache NiFi for Data Flow... and Some Clojure

DZone 's Guide to

Gentle Introduction to Apache NiFi for Data Flow... and Some Clojure

Learn how to create a processor reading some data from a file, a processor writing data to a file, and a processor that can insert data in a MongoDB collection.

· Big Data Zone ·
Free Resource

I have been using Apache Camel for data flow for a long time. Apache Camel is an ultra clean way to code data flow with a fantastic DSL, and it comes with an endless list of components to manage your data flow.

This could be the best possible way of making a simple API that every developer understands and making your data flow as clean and reusable as possible.

Now strong with your newly designed flow, you take your whole data flow to a customer and they ask you to explain what the designed flow is doing. You start opening your favorite text editor, show them a few lines of code, and see each face just disintegrate in front of you while everyone in the room is just waiting for that coffee break to get closer. Even concepts like reading a log file or parsing an output seem to come from a different planet for a non-technical person who came to your meeting for the first time. If you have to explain your work to non-technical people, a data flow system with a graphical representation of the flow is a must.Enter: Apache NiFi.

NiFi in itself is not that much of a new project anymore — it has been around for the last two years already. But it sure builds on strong foundations from similar data flow systems.

What does it do?

Let's reuse the project's bang-on one sentence:

"Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic."

Basically, it can route your data from any streaming source to any data store, using the same graphical view to design, run, and monitor your flows, with a complete transaction overview and integrated version controlled flows. Of course, each flow is reusable, can be templated, and can be combined. Last, but not least, NiFi shines with small latency streaming flows as heavyweight batch data transfers.

Quite quickly, you would eventually be able to draw flows like this:

In this post, we will see how to write a simple flow that:

  • Reads data from a file.
  • Outputs data to MongoDB.
  • Writes a custom processor in Clojure.

There are Docker images for NiFi, but we will start the good old-fashioned way of download a ZIP file with pretty much all needed to start. Head to the download section and retrieve the ZIP or tar.gz archive.

The content of the archive is rather compact, as seen in the screenshot below.

Supposing you have a Java runtime installed, you can get NiFi running by using the bin/nifi.sh script (on Linux or Mac) or bin/run-nifi.bat for windows.

The service is a bit slow to start, so do not hurry before reaching http://localhost:8080/nifi/.

Once you have reached the page, you'll see an empty workspace as the one below:

We will now create a processor that reads content from a file.

You can the processor widget available on the top left of the bar.

You can click and perform a drag-and-drop of it on the main grid. This will open the dialog to select which processor to insert into the flow.

You can click around to see all the different processors out of the box. (And there are a lot of them!) For now, to read a file, we will select the one named GetFile.

Clicking once gives you an extended description of the processor, and double-clicking inserts the processor inside the worksheet.

The processor is inserted. Now ,let's configure its settings by double-clicking on it. The settings we are interested in are in the PROPERTIES tab.

Let's specify the input directory. You can use the path from your local machine here but to make this reusable, we will make use of a variable here. (We will see how to set up this variable soon.) Click on the input directory Value cell to open the dialog below.

Then hit OK.

In the Settings tab, there is one more step to do: tell the processor to finish normally if it is a success, even though we have not defined any other processor to pick up the data (not yet, at least; one thing at a time). For that, let's click on the checkbox near success on the right-hand side.

The processor is set up. Simply click on APPLY and you are back to the main workspace.

Now to specify the variable that will be used in the processor that was just defined, right-click on the workspace to show the contextual menu below.

Then, click Variables. To set up a new variable, click on the +.

Enter the folder.input variable with a path specific to your machine.

That's it for our flow variable. Note that those variables can be used and reused all around the data flow.

The basics are in place. Right-click anywhere in the worksheet, and in the menu, press Start.

At this stage, the static display is slightly out of sync, so a quick page refresh will show the processor is running (see the menu bar).

Let's start playing with it.

Inserting data on Linux or Mac can be done with a simple echo command on the command line. On Windows, you could open a text editor and save the file in the folder that was defined as the folder.input flow variable.

The file actually disappeared almost immediately!

Actually, the file has already been processed, and since there is only one processor in our flow right now, the file was removed and simply disappeared.

To find out what happened to the data, we can follow the data flow using NiFi's data tracing feature. This can be accessed from the top right menu by selecting Data Provenance. Note that this applies to the data flow of the whole worksheet.

That opens up a dialog with the list of all the data passing in the flow.

The show lineage button on the far right shows the progression of the data through the various step of the data flow.

We can also view the content of the message that was going through the flow at any step by double-clicking the flow step. This opens a familiar looking dialog.

From there, you can click on the Download or View button to see the content of the data that went through that step. Hello!

To make it slightly more interesting, let's see how to move the data from one processor to another one. This time, the processor will output the data it receives to the file system.

The processor we will be using is the PutFile, which you can find in the dialog to insert a new processor, as done previously.

We will use and define a new flow variable folder.output and use that in the settings of the processor.

And do not forget to define the variable itself.

Just like most drag-and-drop data flow systems, you can connect two processors by simply clicking on one and going all the way to the other processor. A new dialog appears.

At this stage, click on ADD without changing any of the settings. Now, the two processors are linked, as seen below.

Before starting the second processor, make sure to the processor finishes properly by ticking the failure and success checkboxes.

Now, you can start the non-started processor using the contextual menu of the worksheet as before.

And see the number of started processors increase.

With the two processors ready, we can now insert sometext again in the input folder, the source folder for the GetFile processor.

The file disappears again immediately, and looking in the ${folder.output} folder, you can see the file has been piped there!

Fantastic... what's next?

Now that we are strong on text files to text files, let's see how to insert that data to MongoDB. Yes, I love MongoDB.

We suppose here there is a local MongoDB instance running on port 32768.

The new processor to insert is in a similar fashion to PutFile named PutMongo, and the settings are quite relaxed.

We also create a quick link between the GetFile processor and the new PutMongo processor, as shown below:

By default, MongoDB only accepts JSON entries, so the new text we will send to the data flow will contain JSON data.

The data inserted flows quite quickly, though, and now we can query the MongoDB collection and see a new entry has been inserted:


Without going through all the nasty details, you can also create your own processors using scripting. The processor to use is named ExecuteScript.

Most of the major JVM languages can be used for scripting:

And here we go with a quick entry for Clojure.

The script itself will be stored in a hello.clj located in the location of your choice.

You can pretty much do anything using the NiFi API. 

hello.clj receives a flow element from the session object. The session object itself is a global reference passed to the script.

The rest is mostly writing bytes to the flow itself via what is called a flow file, which contains the data of the data flow at the ExecuteScript processor step.

; ; HELPERS ; (defn output-stream-callback [flow] (reify OutputStreamCallback (process [this outputStream] (.write outputStream (.getBytes (str "i have seen clojure♪")))))) (defn put-sample-content[flow] (.write session flow (output-stream-callback flow))) (defn success-transfer [flow] (.transfer session flow REL_SUCCESS)) ; ; MAIN ; (let [flowFile (.get session)] (-> flowFile put-sample-content success-transfer)) 

Now, piping data makes some new data being inserted into MongoDB — this time, coming from the Clojure scripting.

In this rather long blog post. You have seen how to:

  • Install NiFi.
  • Create a first processor reading some data from a file.
  • Create a second processor writing data to a file.
  • Create a processor that can insert data in a MongoDB collection.
  • Write your processor in Clojure using the NiFi API.

Sweet. Now's your time to explore the NiFi processors and create your own flows!

api, big data, clojure, data flow, nifi, tutorial

Published at DZone with permission of Nicolas Modrzyk . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}