Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Falcon: Defining a Process With Multiple Hive Tables

DZone's Guide to

Apache Falcon: Defining a Process With Multiple Hive Tables

Do you need to use two or more Hive tables as inputs for a single Falcon process? Check out this tutorial to find out how to use multiple Hive table inputs in Falcon.

· Big Data Zone
Free Resource

Learn best practices according to DataOps. Download the free O'Reilly eBook on building a modern Big Data platform.

Sometimes, we might need to use two or more Hive tables as inputs for a single Apache Falcon process. In this post I am going to explain how we can define a Falcon process that takes two input tables and stores the output to one Hive table.

Let's assume that there are two input tables, each partitioned by the feed_date column. For each feed_date, we want to get a record count for both the input table and the store table name, along with the record count in an output table that is also partitioned by feed_date.

The Falcon feed for the first Hive table is defined as:

<feed description="first input table" name="input-table-1" xmlns="uri:falcon:feed:0.1">
    <frequency>days(1)</frequency>
    <timezone>UTC</timezone>
    <late-arrival cut-off="hours(3)"/>

    <clusters>
        <cluster name="test-primary-cluster" type="source">
                        <validity start="2015-09-20T14:00Z" end="2099-03-09T12:00Z" />
                        <retention limit="months(9999)" action="archive" />
        </cluster>
    </clusters>

    <table uri="catalog:tmp_rishav:tbl_1#feed_date=${YEAR}-${MONTH}-${DAY}" />

    <ACL owner="rishav" group="hdpuser" permission="0770"/>
    <schema location="hcat" provider="hcat"/>
</feed>

And the second feed Hive table is defined as:

<feed description="second input table" name="input-table-2" xmlns="uri:falcon:feed:0.1">
    <frequency>days(1)</frequency>
    <timezone>UTC</timezone>
    <late-arrival cut-off="hours(3)"/>

    <clusters>
        <cluster name="test-primary-cluster" type="source">
                        <validity start="2015-09-20T14:00Z" end="2099-03-09T12:00Z" />
                        <retention limit="months(9999)" action="archive" />
        </cluster>
    </clusters>

    <table uri="catalog:tmp_rishav:tbl_2#feed_date=${YEAR}-${MONTH}-${DAY}" />

    <ACL owner="rishav" group="hdpuser" permission="0770"/>
    <schema location="hcat" provider="hcat"/>
</feed>

Our output Hive table is defined as:

<feed description="output table for storing record count" name="rec-count-tbl" xmlns="uri:falcon:feed:0.1">
    <frequency>days(1)</frequency>
    <timezone>UTC</timezone>

    <clusters>
        <cluster name="test-primary-cluster" type="source">
                        <validity start="2015-09-20T14:00Z" end="2099-03-09T12:00Z" />
                        <retention limit="months(9999)" action="archive" />
        </cluster>
    </clusters>

    <table uri="catalog:tmp_rishav:rec_count_tbl#feed_date=${YEAR}-${MONTH}-${DAY}" />

    <ACL owner="rishav" group="hdpuser" permission="0770"/>
    <schema location="hcat" provider="hcat"/>
</feed>

Note: the DDL for the output Hive table is:

USE tmp_rishav
;
CREATE TABLE rec_count_tbl (tbl_name STRING, cnt INT) partitioned by(feed_date date) stored as textfile
;

The Falcon process—which uses two input tables and writes output to one table—is:

<process name="multi-table-process" xmlns="uri:falcon:process:0.1">
    <clusters>
        <cluster name="test-primary-cluster">
            <validity start="2015-09-26T15:00Z" end="2099-03-09T12:00Z" />
        </cluster>
    </clusters>

    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>days(1)</frequency>
    <timezone>UTC</timezone>

    <inputs>
        <input end="today(0,0)" start="today(0,0)" feed="input-table-1" name="input1"/>
        <input end="today(0,0)" start="today(0,0)" feed="input-table-2" name="input2"/>
    </inputs>

    <outputs>
        <output instance="today(0,0)" feed="rec-count-tbl" name="output"/>
    </outputs>

    <workflow engine="hive" path="/tmp/rishav/cnt_script.hql"/>

    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
</process>

The above Falcon process invokes a Hive script, given below:

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE ${falcon_output_database}.${falcon_output_table} PARTITION (${falcon_output_partitions_hive})
SELECT "${falcon_input1_table}" as tbl_name, count(*) as cnt from ${falcon_input1_database}.${falcon_input1_table} where ${falcon_input1_filter}
UNION ALL
select "${falcon_input2_table}" as tbl_name, count(*) as cnt from ${falcon_input2_database}.${falcon_input2_table} where ${falcon_input2_filter}
;

You might have noticed that we are using some variables like falcon_output_database, falcon_input1_database, etc. These variables are defined by Falcon. For a complete list of variables set by Falcon, you can check Falcon UI Log for your process and then check the "Action Configuration" tab for this Oozie workflow.

Falcon sets up these variables for input Hive feed -

<param>falcon_input1_storage_type=TABLE</param>
<param>falcon_input1_catalog_url=thrift://localhost:9083</param>
<param>falcon_input1_table=table1</param>
<param>falcon_input1_database=tmp_rishav</param>
<param>falcon_input1_filter=(feed_date='2015-09-28')</param>

This input1 part of the variable's name is dependent on the name we give to the feed in the Falcon process.

If you have defined multiple Hive input feeds with names like input1, input2, input3, etc., you can refer to them by replacing input1 in the above variable's name.

And for the output Hive feed below, variables are defined as such:

<param>falcon_output_table=rec_count_tbl</param>
<param>falcon_output_partitions_java='feed_date=2015-09-28'</param>
<param>falcon_output_catalog_url=thrift://localhost:9083</param>
<param>falcon_output_partitions_pig='feed_date=2015-09-28'</param>
<param>falcon_output_partitions_hive=feed_date='2015-09-28'</param>
<param>falcon_output_dated_partition_value_feed_date=2015-09-28</param>
<param>falcon_output_storage_type=TABLE</param>
<param>falcon_output_database=tmp_rishav</param>
<param>falcon_output_partitions=feed_date='2015-09-28'</param>

Again, the output part of each variable's name is dependent on the name which we give to output feed in the Falcon process.

Before submitting/scheduling these Falcon entities, we need to upload the Hive script to HDFS.

Now we can submit and schedule these Falcon entities.

The commands to submit and schedule these Falcon entities are:

falcon entity -type feed -submit -file input-table-1-feed.xml
falcon entity -type feed -submit -file input-table-2-feed.xml
falcon entity -type feed -submit -file rec-count-tbl-feed.xml
falcon entity -type process -submit -file multi-table-process.xml

falcon entity -type feed -name input-table-1 -schedule
falcon entity -type feed -name input-table-2 -schedule
falcon entity -type feed -name rec-count-tbl -schedule
falcon entity -type process -name multi-table-process -schedule

Find the perfect platform for a scalable self-service model to manage Big Data workloads in the Cloud. Download the free O'Reilly eBook to learn more.

Topics:
big data ,apache falcon ,workflow ,hive

Published at DZone with permission of Rishav Rohit, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}