Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Write a Data Pipeline with Apache Falcon

DZone's Guide to

Write a Data Pipeline with Apache Falcon

The Falcon process which I am going to describe triggers two conditions in which an Oozie workflow is invoked to call a SSH script.

· Big Data Zone
Free Resource

Effortlessly power IoT, predictive analytics, and machine learning applications with an elastic, resilient data infrastructure. Learn how with Mesosphere DC/OS.

In last two posts (post 1, post 2) I provided basic introductions to Apache Falcon. In this post, I will describe how we can write a basic Falcon data pipeline.

The Falcon process which I am going to describe triggers two conditions:

  1.  Process start time (i.e. 15:00 UTC) is met.
  2. And a trigger folder is created in location /tmp/feed-01/ with name as ${YEAR}-${MONTH}-${DAY}.

Once the Falcon process is triggered, it invokes an Oozie workflow which calls a SSH script which just prints the two input parameters to /tmp/demo.out file on local FS of SSH box.

The code for Falcon cluster (test-primary-cluster) is:



<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="test-primary-cluster" description="test-primary-cluster" colo="TEST DEV PRIMARY CLUSTER" xmlns="uri:falcon:cluster:0.1">
    <interfaces>
        <interface type="readonly" endpoint="hftp://localhost:50070" version="2.2.0"/>
        <interface type="write" endpoint="hdfs://localhost:8020" version="2.2.0"/>
        <interface type="execute" endpoint="localhost:8050" version="2.2.0"/>
        <interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0.0"/>
        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.1.6"/>
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/test-primary-cluster/staging"/>
        <location name="temp" path="/tmp"/>
        <location name="working" path="/apps/falcon/test-primary-cluster/working"/>
    </locations>
    <ACL owner="rishav" group="hdpuser" permission="0770"/>
</cluster>

One important thing to note here is you need to create staging and working directories on HDFS with proper permission and ownership. The below permissions and ownership are needed on Hortonworks cluster:

hadoop fs -mkdir -p /apps/falcon/test-primary-cluster/staging/
hadoop fs -chmod 777 /apps/falcon/test-primary-cluster/staging/
hadoop fs -mkdir -p /apps/falcon/test-primary-cluster/working/
hadoop fs -chmod 755 /apps/falcon/test-primary-cluster/working/
hadoop fs -chown -R falcon:hadoop /apps/falcon/test-primary-cluster

The code for Falcon feed (feed-01-trigger) is:



<?xml version="1.0" encoding="UTF-8"?>
<feed description="feed-01-trigger"
        name="feed-01-trigger" xmlns="uri:falcon:feed:0.1">
        <frequency>days(1)</frequency>
        <late-arrival cut-off="hours(20)" />
        <clusters>
                <cluster name="test-primary-cluster" type="source">
                        <validity start="2015-09-07T14:00Z" end="2099-03-09T12:00Z" />
                        <retention limit="months(9999)" action="archive" />
                        <locations>
                                <location type="data" path="/tmp/feed-01/${YEAR}-${MONTH}-${DAY}" />
                        </locations>
                </cluster>
        </clusters>

        <locations>
                <location type="data" path="/tmp/feed-01/${YEAR}-${MONTH}-${DAY}" />
                <location type="stats" path="/none" />
                <location type="meta" path="/none" />
        </locations>

    <ACL owner="rishav" group="hdpuser" permission="0770"/>
        <schema location="/none" provider="none" />
</feed>

For this feed -

  • The retention limit is set to 9999 months.
  • Late arrival limit is set to 20 hours.
  • And frequency is set to daily.

The code for Falcon process (process-01) is:



<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="process-01"
        xmlns="uri:falcon:process:0.1">
        <clusters>
                <cluster name="test-primary-cluster">
                        <validity start="2015-09-08T15:00Z" end="2099-03-10T23:00Z" />
                </cluster>
        </clusters>
        <parallel>1</parallel>
        <order>FIFO</order>
        <frequency>days(1)</frequency>
        <timezone>UTC</timezone>
        <inputs>
                <input name="feed-01-trigger"
                        end="today(1,0)" start="today(0,0)"
                        feed="feed-01-trigger" />
        </inputs>
        <properties>
                <property name="workflowName" value="workflow-01" />
                <property name="input1" value="variable1" />
                <property name="input2" value="${formatTime(dateOffset(instanceTime(), -1, 'DAY'),'yyyy-MM-dd')}" />
        </properties>
        <workflow name="workflow-01"
                version="2.0.0" engine="oozie"
                path="/tmp/oozie_workflow" />
        <retry policy="periodic" delay="minutes(15)" attempts="2" />
    <ACL owner="rishav" group="hdpuser" permission="0770"/>
</process>

For this process -

  • The start time is set at 15:00 UTC.
  • Dependency is set to input feed feed-01-trigger.
  • Retry policy is set to 2 times with a gap of 15 minutes.
  • This process is also using EL expression to set input2 variable to get yesterday's date.

The oozie workflow with SSH action is as defined below:



<workflow-app name="${workflowName}" xmlns="uri:oozie:workflow:0.1">
 <start to="demo_script"/>

 <action name="demo_script">
        <ssh xmlns="uri:oozie:ssh-action:0.1">
          <host>rishav@poc001</host>
          <command>~/demo.bash</command>
          <args>${input1}</args>
          <args>${input2}</args>
      <capture-output/>
        </ssh>
   <ok to="end"/>
   <error to="kill"/>
 </action>
 <kill name="kill">
   <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
 </kill>
 <end name="end"/>
</workflow-app>

This Oozie workflow -

  • Gets input1, input2 and workflowName variable from Falcon proces-01 process.
  • And invokes shell script on poc001 box with input1 and input2 as parameters.

And demo.bash script called by Oozie SSH action is given below:



cd ~
echo `date` >> /tmp/demo.out
echo "input1 $1" >> /tmp/demo.out
echo "input2 $2" >> /tmp/demo.out

demo.bash is a simple script which echos current date, input1 and input2 variable to /tmp/demo.out file.

In my next post I will explain how we can submit and schedule these Falcon processes.

Learn to design and build better data-rich applications with this free eBook from O’Reilly. Brought to you by Mesosphere DC/OS.

Topics:
big data ,apache ,apache falcon ,oozie ,data pipeline

Published at DZone with permission of Rishav Rohit, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}