Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Advanced Apache NiFi Flow Techniques

DZone's Guide to

Advanced Apache NiFi Flow Techniques

Learn how to serialize NiFi flows and then reconstitute them with data and metadata at a later time and continue your flow processing.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Let's start with FlowFile continuation.

FlowFile Continuation

Image titleSometimes, you need to backup your current running flow, let that flow run at a later date, or make a backup of what is in-process now. You want this in a permanent storage and want to reconstitute it later like orange juice and add it back into the flow or restart it.

This could be due to failures, for integration testing, for testing new versions of components, as a checkpoint, or for many other purposes. You don't always want to reprocess the original source or files (they may be gone).

Image title

Option 1

You can save that raw data that came in originally in local files or HDFS, then read it out of there later.

Option 2

Preferred: MergeContent to FlowFileV3, then reload with GET* to IdentifyMimeType to UnpackContent.

After using MergeContent with FlowFileV3, you can PutFilePutS3ObjectPutHDFS or other file saving options. Perhaps send it to an FTP or sFTP server for storage elsewhere.

Now, you have a PKG file.

cat /opt/demo/flow/904381478117605.pkg
NiFiFF3+tempf73.02sql.args.2.value29.7sql.args.11.type3roll353.9306742667328
mqtt.brokertcp://m13.cloudmqtt.com:14162sql.args.4.type3uuid$9f2f8b6f-2870-40a3-a460-49427cddf9a8
mqtt.topicsensorsql.args.7.type3sql.args.7.value353.9306742667328path./sql.args.4.value33.9sql.args.9.value-0.0sql.args.1.type1humidity29.7pitch14.015266431562901
nf.file.path.mqtt.qos0sql.args.8.type3temp33.9sql.args.1.value34sql.args.2.type3sql.args.10.type3sql.args.8.value128.4983979122009sql.args.5.type3sql.args.6.value14.015266431562901sql.args.3.value1011.1sql.args.10.value-0.0mqtt.isDuplicatefalspressure1011.1mqtt.isRetainedfalseyaw128.4983979122009cputemp3filename904381478117605sql.args.11.value1.0sql.args.9.type3x-0.0y-0.0z1.0sql.args.6.type3
nf.file.name904381478117605sql.args.5.value73.02sql.args.3.type3�[{"tempf": 73.02, "pressure": 1011.1, "pitch": 14.015266431562901, "temp": 33.9, "yaw": 128.4983979122009, "humidity": 29.7, "cputemp": "34", "y": -0.0, "x": -0.0, "z": 1.0, "roll": 353.9306742667328}]%

You can now reload that FlowFileV3 at any time, send it to IdentifyMimeType (so it knows it's a FlowFileV3), and then use UnpackContent to reconstitute into the original flow file. Now, you can use it like it never stopped and was sent to disk. Now you have an unlimited queue to store pre or partially processed files. This saves time! You could run really expensive processes once and save the preprocessed items, files, or models and reuse everywhere!

Choose: FlowFile Stream, v3.

Image titleImage titleThanks to Joe Witt for the explanation of this process.

References

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,nifi ,streaming ,machine learning ,flows

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}