Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Advanced Apache NiFi Flow Techniques

DZone's Guide to

Advanced Apache NiFi Flow Techniques

Learn how to serialize NiFi flows and then reconstitute them with data and metadata at a later time and continue your flow processing.

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

Let's start with FlowFile continuation.

FlowFile Continuation

Image titleSometimes, you need to backup your current running flow, let that flow run at a later date, or make a backup of what is in-process now. You want this in a permanent storage and want to reconstitute it later like orange juice and add it back into the flow or restart it.

This could be due to failures, for integration testing, for testing new versions of components, as a checkpoint, or for many other purposes. You don't always want to reprocess the original source or files (they may be gone).

Image title

Option 1

You can save that raw data that came in originally in local files or HDFS, then read it out of there later.

Option 2

Preferred: MergeContent to FlowFileV3, then reload with GET* to IdentifyMimeType to UnpackContent.

After using MergeContent with FlowFileV3, you can PutFilePutS3ObjectPutHDFS or other file saving options. Perhaps send it to an FTP or sFTP server for storage elsewhere.

Now, you have a PKG file.

cat /opt/demo/flow/904381478117605.pkg
NiFiFF3+tempf73.02sql.args.2.value29.7sql.args.11.type3roll353.9306742667328
mqtt.brokertcp://m13.cloudmqtt.com:14162sql.args.4.type3uuid$9f2f8b6f-2870-40a3-a460-49427cddf9a8
mqtt.topicsensorsql.args.7.type3sql.args.7.value353.9306742667328path./sql.args.4.value33.9sql.args.9.value-0.0sql.args.1.type1humidity29.7pitch14.015266431562901
nf.file.path.mqtt.qos0sql.args.8.type3temp33.9sql.args.1.value34sql.args.2.type3sql.args.10.type3sql.args.8.value128.4983979122009sql.args.5.type3sql.args.6.value14.015266431562901sql.args.3.value1011.1sql.args.10.value-0.0mqtt.isDuplicatefalspressure1011.1mqtt.isRetainedfalseyaw128.4983979122009cputemp3filename904381478117605sql.args.11.value1.0sql.args.9.type3x-0.0y-0.0z1.0sql.args.6.type3
nf.file.name904381478117605sql.args.5.value73.02sql.args.3.type3�[{"tempf": 73.02, "pressure": 1011.1, "pitch": 14.015266431562901, "temp": 33.9, "yaw": 128.4983979122009, "humidity": 29.7, "cputemp": "34", "y": -0.0, "x": -0.0, "z": 1.0, "roll": 353.9306742667328}]%

You can now reload that FlowFileV3 at any time, send it to IdentifyMimeType (so it knows it's a FlowFileV3), and then use UnpackContent to reconstitute into the original flow file. Now, you can use it like it never stopped and was sent to disk. Now you have an unlimited queue to store pre or partially processed files. This saves time! You could run really expensive processes once and save the preprocessed items, files, or models and reuse everywhere!

Choose: FlowFile Stream, v3.

Image titleImage titleThanks to Joe Witt for the explanation of this process.

References

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:
big data ,nifi ,streaming ,machine learning ,flows

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}