Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Dataflows in Enterprise Using Apache NiFi

DZone's Guide to

Dataflows in Enterprise Using Apache NiFi

In this article, we'll categorize different types of data access into patterns and build dataflows to process data.

· Database Zone ·
Free Resource

RavenDB vs MongoDB: Which is Better? This White Paper compares the two leading NoSQL Document Databases on 9 features to find out which is the best solution for your next project.  

Enterprises consume data from a variety of sources. In this article, we'll categorize these different types of data access into patterns and build dataflows to process data. We'll create dataflows using an open-source tool, Apache NiFi, for each type of data access pattern. These dataflows will model the entire process of data movement as the data is processed by different components.

Data Access Patterns

We can divide different types of data access into three categories:

  1. Batch — Customers make data available by uploading tarballs. In many enterprises, this is still the most common data consumption format.
  2. Push — Enterprises typically subscribe to different data streaming sources and these sources push data to the subscribers.
  3. Pull — Data may reside on internal or external data sources and the preferred mode of consuming data is pulling the data on demand, e.g., calling a REST API.

Scenario

To illustrate these data access patterns using concrete use cases, let us assume a hypothetical enterprise that uses sensors to measure different physical characteristics of an instrument, e.g., temperature, pressure, etc. These sensors provide sensor data tags as time series data. Some of these sensors may be installed in locations that have good connectivity to the cloud where they can post the readings (Push mode), whereas others may store the readings in on-premise time series datastores (Pull mode). Still, others may dump the readings into files (Batch mode). We will perform a simple transformation of the data for illustrative purposes. The sensors may have been installed by different technicians and they might have been configured differently. For example, one temperature sensor may call the temperature tag as TEMP, while others may call it TEMPERATURE. Tag names might also get changed due to data corruption in transit. We will develop components to process these raw unclean data into a clean format with standard tag names ready for downstream processing. The overall high-level dataflow is illustrated below.

Image title

Prerequisites

Before we develop dataflows for each pattern, we need following two artifacts to be used later.

1. TagMapping processor in Apache NiFi that will suggest a standard tag given an input tag name.

We will use ExecuteScriptNiFi processor to develop a scripted processor in Python. The Python package difflib has a function get_close_matchesthat returns a list of best matches. The code for the processor is shown below.

import json
import difflib
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    ts_data = json.loads(text)
    std_tags = ['TEMPERATURE', 'PRESSURE']
    for item in ts_data:
        matched = difflib.get_close_matches(item['tag'], std_tags, 1, 0.5)
        item['tag'] = matched[0]
    outputStream.write(ts_data)

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", 'tagmapped')
  session.transfer(flowFile, REL_SUCCESS)

Create a ExecuteScriptpython processor with the above script body and save it with the name as TagMapping. To test the processor, pass a JSON array shown below as flowfile and examine the output as illustrated in the diagram below.

Image title

2. Schema of the time series sensor data.

Let us examine sample readings of sensor tag data that we will receive as shown below.

[{"quality":3,"tag":"Temp","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"Temp","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"Press","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"Temp","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"Temperature","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"Pressure","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"TEMP","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"TEMP","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"TEMP1","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"TEMPR","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"PRESSURE","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"PRESSR","timestamp":1491019200000,"value":0.6},
{"quality":3,"tag":"PRESS1","timestamp":1491019200000,"value":0.6}]

We will use InferAvroSchemaNiFi processor for this purpose. Create a processor group and call it "Infer Schema" and save it as shown below.

Image title

Configure the InferAvroSchemaprocessor as shown below.

Image title

Pass the JSON file containing sample sensor tag data to the processor group "Infer Schema" and examine the output. The Avro schema, timeseries.avro, generated should look like the following:

{
  "type" : "record",
  "name" : "json",
  "namespace" : "input",
  "fields" : [ {
    "name" : "quality",
    "type" : "int",
    "doc" : "Type inferred from '3'"
  }, {
    "name" : "tag",
    "type" : "string",
    "doc" : "Type inferred from '\"Temp\"'"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc" : "Type inferred from '1491019200000'"
  }, {
    "name" : "value",
    "type" : "double",
    "doc" : "Type inferred from '0.6'"
  } ]
}

Now that we have a TagMapping processor and schema of the incoming sensor data, let us develop data flows for the three data access patterns.

Dataflows

Our dataflows consist of stages of processing specific to each pattern of data access. We will store all the contents after processing in HDFS. We will use Parquet format to store contents for Batch mode as the number of records is likely to be large enough to justify overheads involved with storage in columnar data format. For the Push and Pull modes, we will store the data in Avro format, which is a compact row-based binary format. The choice of Parquet and Avro here is for illustrative purposes only. If our use case typically retrieves all of the fields in a row in each query, Avro is usually the best choice. On the other hand, if our dataset has many columns, and our use case typically involves working with a subset of those columns rather than entire records, Parquet is optimized for that kind of work.

To get data for Push model, we'll subscribe to a topic in Kafka and for the Pull model, we will use a timeseries database, InfluxDB and query data using its HTTP API. The following diagram illustrates the individual steps.

Image title

Create DataSets

HDFS is our choice of sink. Install one of the Hadoop distributions, e.g., CDH or HDP to use HDFS. We will use Kite, which is a high-level data layer for Hadoop. Kite’s API and tools are built around datasets. Install Kite SDK on the machine running Hadoop and create datasets to store our sensor data in Avro and Parquet formats on HDFS. The Avro schema timeseries.avrothat is passed to kite-datasetwas generated above as shown in pre-requisites section.

source /etc/hive/conf/hive-env.sh

kite-dataset create timeseries_avro -s timeseries.avro
kite-dataset create timeseries_parquet -s timeseries.avro --format parquet

Batch

The batch dataflow is illustrated below.

Image title

All the processors used are standard processors available with Apache NiFi. The configurable properties of the processors are fairly intuitive. Some of the important properties to be configured are outlined below.

1. ConvertJSONToAvro processor requires Record Schemaproperty. This should be set to the schema derived using InferAvroSchemaprocessor earlier.

2.StoreInKiteDataset processor requires Target Dataset URI. This can be obtained from Kite SDK for the desired dataset as shown below.

> kite-dataset list

dataset:hive://<hostname>:9083/default/timeseries_avro

Push

In this mode, we use ConsumeKafkaprocessor to build the dataflow pipeline as illustrated below. The properties Kafka Brokersaccepts the comma-separated list of Kafka brokers and Topic Namesaccepts the comma-separated list of topics to listen to. As each message is received on the configured Kafka topic(s), its tag name is replaced by a standard tag, converted to Avro format and saved in Kite dataset.

Image title

Pull

We store sensor data in InfluxDB and pull time series records using its HTTP API. We can use InvokeHTTPprocessor to GET sensor measurement records or invoke the following curl command in ExecuteStreamCommand processor.

curl -H "Accept: application/csv" -G 'http://localhost:8086/query?db=sensordb' 
     --data-urlencode 'q=SELECT * FROM "timeseries"'

The records are returned in CSV format.

tag,timestamp,value,quality
TEMP,1488327378000000000,11,3
PRESSURE,1488327378000000000,11,3

However, the TagMappingprocessor takes records in JSON format. To convert CSV to JSON, we useConvertCsvToAvroand thenConvertAvroToJson. This approach is the easiest as it requires almost no extra work.

Image title

Conclusion

In this article, we looked at various data access patterns within an enterprise and divided them into three categories — batch, push and pull. We then examined several examples and developed dataflows using Apache NiFi for each category. These dataflows provided concrete steps to model the entire process of data movement as the data is processed by different components in the flow from source to the final storage, HDFS, in our case.

Get comfortable using NoSQL in a free, self-directed learning course provided by RavenDB. Learn to create fully-functional real-world programs on NoSQL Databases. Register today.

Topics:
nifi ,data flow ,hdfs ,avro ,parquet ,kite sdk ,influxdb ,timeseries ,kafka

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}