{{announcement.body}}
{{announcement.title}}

Let's Build a Simple REST Ingest to Cloud Data Warehouse App With LowCode? Powered by Apache NiFi

DZone 's Guide to

Let's Build a Simple REST Ingest to Cloud Data Warehouse App With LowCode? Powered by Apache NiFi

A simple tutorial on using Apache NiFi to load data to any cloud.

· Integration Zone ·
Free Resource


Use NiFi to Call REST API, Transform, Route and Store the Data

Pick any REST API of your choice, but I have walked through this one to grab a number of weather stations reports.  Weather or not we have good weather, we can query it anyway.

We are going to build a GenerateFlowFile to feed our REST calls.

Java
 




x
22


 
1
[
2
{"url":"http://weather.gov/xml/current_obs/CWAV.xml"},
3
{"url":"http://weather.gov/xml/current_obs/KTTN.xml"},
4
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
5
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
6
{"url":"http://weather.gov/xml/current_obs/CWDK.xml"},
7
{"url":"http://weather.gov/xml/current_obs/CWDZ.xml"},
8
{"url":"http://weather.gov/xml/current_obs/CWFJ.xml"},
9
{"url":"http://weather.gov/xml/current_obs/PAEC.xml"},
10
{"url":"http://weather.gov/xml/current_obs/PAYA.xml"},
11
{"url":"http://weather.gov/xml/current_obs/PARY.xml"},
12
{"url":"http://weather.gov/xml/current_obs/K1R7.xml"},
13
{"url":"http://weather.gov/xml/current_obs/KFST.xml"},
14
{"url":"http://weather.gov/xml/current_obs/KSSF.xml"},
15
{"url":"http://weather.gov/xml/current_obs/KTFP.xml"},
16
{"url":"http://weather.gov/xml/current_obs/CYXY.xml"},
17
{"url":"http://weather.gov/xml/current_obs/KJFK.xml"},
18
{"url":"http://weather.gov/xml/current_obs/KISP.xml"},
19
{"url":"http://weather.gov/xml/current_obs/KLGA.xml"},
20
{"url":"http://weather.gov/xml/current_obs/KNYC.xml"},
21
{"url":"http://weather.gov/xml/current_obs/KJRB.xml"}
22
]


So we are using ${url} which will be one of these. Feel free to pick your favorite airports or locations near you. https://w1.weather.gov/xml/current_obs/index.xml

If you wish to choose your own data adventure, you can pick one of these others. You will have to build your own table if you wish to store it. They return CSV, JSON or XML, since we have record processors we don’t care. Just know which you pick.

Then we will use SplitJSON to split the JSON records into single rows.

Then use EvaluateJSONPath to extract the URL.

Now we are going to call those REST URLs with InvokeHTTP.

You will need to create a Standard SSL controller.

This is the default JDK JVM on Mac or some Centos 7.   You may have a real password, if so you are awesome.   If you don't know it, that's rough.   You can build a new one with SSL.

For more cloud ingest fun, https://docs.cloudera.com/cdf-datahub/7.1.0/howto-data-ingest.html.

SSL Defaults (In CDP Datahub, one is built for you automagically, thanks Michael).

Truststore filename: /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts 

Truststore password: changeit 

Truststore type: JKS 

TLS Protocol: TLS


StandardSSLContextService for Your GET ${url}



We can tweak these defaults.


Then we are going to run a query to convert these and route based on our queries.


Example query on the current NOAA weather observations to look for temperature in fareneheit below 60 degrees. You can make a query with any of the fields in the where cause. Give it a try!

You will need to set the Record Writer and Record Reader:

Record Reader: XML 

Record Writer: JSON


Java
 




xxxxxxxxxx
1


1
SELECT * FROM FLOWFILE
2
WHERE temp_f <= 60



Java
 




xxxxxxxxxx
1


 
1
SELECT * FROM FLOWFILE


Now we are splitting into three concurrent paths. This shows the power of Apache NiFi. We will write to Kudu, HDFS and Kafka.

For the results of our cold path (temp_f ⇐60), we will write to a Kudu table.

Kudu Masters: edge2ai-1.dim.local:7051 Table Name: impala::default.weatherkudu Record Reader: Infer Json Tree Reader Kudu Operation Type: UPSERT

Before you run this, go to Hue and build the table.

Java
 




xxxxxxxxxx
1
12


 
1
CREATE TABLE weatherkudu
2
(`location` STRING,`observation_time` STRING, `credit` STRING, `credit_url` STRING, `image` STRING, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
3
`station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE,  `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
4
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
5
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
6
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
7
`copyright_url` STRING, `privacy_policy_url` STRING,
8
PRIMARY KEY (`location`, `observation_time`)
9
)
10
PARTITION BY HASH PARTITIONS 4
11
STORED AS KUDU
12
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');



Let it run and query it.   Kudu table queried via Impala, try it in Hue.

The Second fork is to Kafka, this will be for the 'all' path.

Kafka Brokers: edge2ai-1.dim.local:9092 Topic: weather Reader & Writer: reuse the JSON ones

The Third and final fork is to HDFS (could be ontop of S3 or Blob Storage) as Apache ORC files. This will also autogenerate the DDL for an external Hive table as an attribute, check your provenance after running.

JSON in and out for record readers/writers, you can adjust the time and size of your batch or use defaults.

Hadoop Config: /etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/core-site.xml Record Reader: Infer Json Directory: /tmp/weather Table Name: weather

Before we run, build the /tmp/weather directory in HDFS and give it 777 permissions. We can do this with Apache Hue.


Once we run we can get the table DDL and location:


Go to Hue to create your table.


Java
 




xxxxxxxxxx
1


 
1
CREATE EXTERNAL TABLE IF NOT EXISTS `weather`
2
(`credit` STRING, `credit_url` STRING, `image` STRUCT<`url`:STRING, `title`:STRING, `link`:STRING>, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
3
`location` STRING, `station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE, `observation_time` STRING, `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
4
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
5
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
6
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
7
`copyright_url` STRING, `privacy_policy_url` STRING)
8
STORED AS ORC
9
LOCATION '/tmp/weather'



You can now use Apache Hue to query your tables and do some weather analytics. When we are upserting into Kudu we are ensuring no duplicate reports for a weather station and observation time.

Java
 




xxxxxxxxxx
1


 
1
select `location`, weather, temp_f, wind_string, dewpoint_string, latitude, longitude, observation_time
2
from weatherkudu
3
order by observation_time desc, station_id asc




Java
 




xxxxxxxxxx
1


1
select *
2
from weather



In Atlas, we can see the flow.   

How easy is that? 

Topics:
apache-kafka, apache-nifi, aws, azure, big data, cloudera, ingest, integration, rest, streaming

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}