Writing Custom Hive UDF and UDAF
User Defined Functions and User Defined Aggregate Functions allow you to completely customize how Hive evaluates data and manipulate queries.
Join the DZone community and get the full member experience.
Join For FreeIn this article, I am going to walk over easy to follow examples and show how to create Hive User Defined Functions (UDF) and User Defined Aggregate Functions (UDAFs), package into a JAR, and test in Hive CLI. So let’s begin.
In my Sqoop import article, I imported a customers table. Similarly, I have imported an orders table, which I used in my Hive Joins article. Also, I am using a dummy table for UDF verification. You can find the relevant Sqoop commands on GitHub.
Hive supports of a lot of built-in SQL-like functions in HiveQL. But just in case, if there is a need to write your own UDF, no one is stopping you.
UDF (User Defined Function)
Here I am going to show how to write a simple “trim-like” function called “Strip” – of course, you can write something fancier, but my goal here is to take away something in a short amount of time. So let’s begin.
How to Write a UDF function in Hive?
- Create a Java class for the User Defined Function which extends ora.apache.hadoop.hive.sq.exec.UDF and implements more than one evaluate() methods. Put in your desired logic and you are almost there.
- Package your Java class into a JAR file (I am using Maven)
- Go to Hive CLI, add your JAR, and verify your JARs is in the Hive CLI classpath
- CREATE TEMPORARY FUNCTION in Hive which points to your Java class
- Use it in Hive SQL and have fun!
There are better ways to do this, by writing your own GenericUDF to deal with non-primitive types like arrays and maps – but I am not going to cover it in this article.
I will go into detail for each one.
Create Java Class for a User Defined Function
As you can see below I am calling my Java class “Strip”. You can call it anything, but the important point is that it extends the UDF interface and provides two evaluate() implementations.
evaluate(Text str, String stripChars) - will trim specified characters in stripChars from first argument str.
evaluate(Text str) - will trim leading and trailing spaces.
package org.hardik.letsdobigdata;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class Strip extends UDF {
private Text result = new Text();
public Text evaluate(Text str, String stripChars) {
if(str == null) {
return null;
}
result.set(StringUtils.strip(str.toString(), stripChars));
return result;
}
public Text evaluate(Text str) {
if(str == null) {
return null;
}
result.set(StringUtils.strip(str.toString()));
return result;
}
}
Package Your Java Class into a JAR
There is a pom.xml attached in GitHub. Please make sure you have Maven installed. If you are working with a GitHub clone, go to your shell:
$ cd HiveUDFs
and run "mvn clean package". This will create a JAR file which contains our UDF class. Copy the JAR's path.
Go to the Hive CLI and Add the UDF JAR
hive> ADD /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path
Added resources: [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar]
Verify JAR is in Hive CLI Classpath
You should see your jar in the list.
hive> list jars;
/usr/lib/hive/lib/hive-contrib.jar
/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar
Create Temporary Function
It does not have to be a temporary function. You can create your own function, but just to keep things moving, go ahead and create a temporary function.
You may want to add ADD JAR and CREATE TEMPORARY FUNCTION to .hiverc file so they will execute at the beginning of each Hive session.
UDF Output
The first query strips ‘ha’ from string ‘hadoop’ as expected (2 argument evaluate() in code). The second query strips trailing and leading spaces as expected.
hive> CREATE TEMPORARY FUNCTION STRIP AS 'org.hardik.letsdobigdata.Strip';
hive> select strip('hadoop','ha') from dummy;
OK
doop
Time taken: 0.131 seconds, Fetched: 1 row(s)
hive> select strip(' hiveUDF ') from dummy;
OK
hiveUDF
If you have made this far, congratulations! That was our UDF in action! You can follow the code on GitHub.
UDAF (User Defined Aggregated Function)
Now, equipped with our first UDF knowledge, we will move to a next step. When we say aggregation, COUNT, AVG, SUM, MIN, and MAX come to our mind.
I am picking a very simple aggregation function AVG/MEAN, where I am going to work with the “orders” table imported using Sqoop. Once you import it into Hive, it will look like the below (or you can use LOAD DATA INPATH – it is totally up to you.)
hive> select * from orders;
OK
orders.order_id orders.order_date orders.customer_id orders.amount
101 2016-01-01 7 3540
102 2016-03-01 1 240
103 2016-03-02 6 2340
104 2016-02-12 3 5000
105 2016-02-12 3 5500
106 2016-02-14 9 3005
107 2016-02-14 1 20
108 2016-02-29 2 2000
109 2016-02-29 3 2500
110 2016-02-27 1 200
The goal of our UDAF is to find the average amount of orders for all customers in the orders table.
We are looking for Query:
SELECT CUSTOMER_ID, AVG(AMOUNT) FROM ORDERS GROUP BY CUSTOMER_ID;
I am going to replace AVG function with “MEAN” function
But before I begin, let’s stop and think as we are entering the MapReduce world. One of the bottlenecks you want to avoid is moving too much data from the Map to the Reduce phase.
An aggregate function is more difficult to write than a regular UDF. Values are aggregated in chunks (across many maps or many reducers), so the implementation has to be capable of combining partial aggregations into final results.
At a high-level, there are two parts to implementing a Generic UDAF:
- evaluator – The evaluator class actually implements the UDAF logic.
- resolver – The resolver class handles type checking and operator overloading (if you want it), and helps Hive find the correct evaluator class for a given set of argument types.
We are not creating a GenericUDAF. We are creating our one-time aggregation function, so we do not have to worry about a resolver. I am planning write on GenericUDF/GenericUDAF, though. It may be some other day, but soon. :)
How to Write UDAF?
- Create a Java class which extends org.apache.hadoop.hive.ql.exec.hive.UDAF;
- Create an inner class which implements UDAFEvaluator;
- Implement five methods ()
- init() – The init() method initializes the evaluator and resets its internal state. We are using new Column() in the code below to indicate that no values have been aggregated yet.
- iterate() – this method is called every time there is a new value to be aggregated. The evaulator should update its internal state with the result of performing the aggregation (we are doing sum – see below). We return true to indicate that the input was valid.
- terminatePartial() – this method is called when Hive wants a result for the partial aggregation. The method must return an object that encapsulates the state of the aggregation.
- merge() – this method is called when Hive decides to combine one partial aggregation with another.
- terminate() – this method is called when the final result of the aggregation is needed.
- Compile and package the JAR
- CREATE TEMPORARY FUNCTION in hive CLI
- Run Aggregation Query and Verify Output!!!
MeanUDAF.java
package org.hardik.letsdobigdata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.hardik.letsdobigdata.MeanUDAF.MeanUDAFEvaluator.Column;
@Description(name = "Mean", value = "_FUNC(double) - computes mean", extended = "select col1, MeanFunc(value) from table group by col1;")
public class MeanUDAF extends UDAF {
// Define Logging
static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());
public static class MeanUDAFEvaluator implements UDAFEvaluator {
/**
* Use Column class to serialize intermediate computation
* This is our groupByColumn
*/
public static class Column {
double sum = 0;
int count = 0;
}
private Column col = null;
public MeanUDAFEvaluator() {
super();
init();
}
// A - Initalize evaluator - indicating that no values have been
// aggregated yet.
public void init() {
LOG.debug("Initialize evaluator");
col = new Column();
}
// B- Iterate every time there is a new value to be aggregated
public boolean iterate(double value) throws HiveException {
LOG.debug("Iterating over each value for aggregation");
if (col == null)
throw new HiveException("Item is not initialized");
col.sum = col.sum + value;
col.count = col.count + 1;
return true;
}
// C - Called when Hive wants partially aggregated results.
public Column terminatePartial() {
LOG.debug("Return partially aggregated results");
return col;
}
// D - Called when Hive decides to combine one partial aggregation with another
public boolean merge(Column other) {
LOG.debug("merging by combining partial aggregation");
if(other == null) {
return true;
}
col.sum += other.sum;
col.count += other.count;
return true;
}
// E - Called when the final result of the aggregation needed.
public double terminate(){
LOG.debug("At the end of last record of the group - returning final result");
return col.sum/col.count;
}
}
}
Package and ADD JAR
hive> ADD JAR /home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar;
Added [/home/cloudera/workspace/HiveUDFs/target/HiveUDFs-0.0.1-SNAPSHOT.jar] to class path
Added resources: [/home/cloudera/workspace/HiveUDFs/target/StudentCourseMRJob-0.0.1-SNAPSHOT.jar]
CREATE FUNCTION in HIVE
hive> CREATE TEMPORARY FUNCTION MeanFunc AS 'org.hardik.letsdobigdata.MeanUDAF';
OK
Verify Output
Execute the below group by query. Our function is called MeanFunc
hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
FAILED: SemanticException [Error 10001]: Line 1:42 Table not found 'orders'
hive> use sqoop_workspace;
OK
Time taken: 0.247 seconds
hive> select customer_id, MeanFunc(amount) from orders group by customer_id;
Query ID = cloudera_20160302030202_fb24b7c1-4227-4640-afb9-4ccd29bd735f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1456782715090_0020, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1456782715090_0020/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1456782715090_0020
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-03-02 03:03:16,703 Stage-1 map = 0%, reduce = 0%
2016-03-02 03:03:53,241 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 3.31 sec
2016-03-02 03:03:55,593 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.9 sec
2016-03-02 03:04:09,201 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 6.18 sec
MapReduce Total cumulative CPU time: 6 seconds 180 msec
Ended Job = job_1456782715090_0020
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 6.18 sec HDFS Read: 12524 HDFS Write: 77 SUCCESS
Total MapReduce CPU Time Spent: 6 seconds 180 msec
OK
1 153.33333333333334
2 2000.0
3 4333.333333333333
6 2340.0
7 3540.0
9 3005.0
Time taken: 72.172 seconds, Fetched: 6 row(s)
Verify Individual customer_id : As you can see, group by value matches – you can cross check manually. Thanks for your time and reading my blog – hope this is helpful!!!
hive> select * from orders where customer_id = 1;
OK
102 2016-03-01 1 240
107 2016-02-14 1 20
110 2016-02-27 1 200
Time taken: 0.32 seconds, Fetched: 3 row(s)
hive> select * from orders where customer_id = 2;
OK
108 2016-02-29 2 2000
Time taken: 0.191 seconds, Fetched: 1 row(s)
hive> select * from orders where customer_id = 3;
OK
104 2016-02-12 3 5000
105 2016-02-12 3 5500
109 2016-02-29 3 2500
Time taken: 0.093 seconds, Fetched: 3 row(s)
Published at DZone with permission of Hardik Pandya, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments