Counters in Apache Pig
Counters in Hadoop are lightweight objects helping us in tracking the progress of Map and reduce tasks.
Join the DZone community and get the full member experience.
Join For FreeWhen dealing with enormous amount of raw data daily, there might be scenarios where we might be interested in gathering information about the data that we are analyzing, like how many types of records were processed, how many invalid records were found while running the job, etc. This functionality is provided by Counters in Hadoop, which are lightweight objects helping us in tracking the progress of Map and reduce tasks. Since Apache Pig is a widely used for ETL flows in organisations and Apache Pig generates mapreduce jobs underneath we can make use of these hadoop counters in Apache Pig to suit our needs. In this post lets see how we can gather information about the data we want to deal with and leverage hadoop counters functionality in Pig and store these counter metrics in a persistent store like a file,database etc., so that this information can later be viewed upon to understand more about our data.
In order to make use of hadoop counters, we need to write pigudf and perform all the related operations within the udf. Finally once all validations are over we can store these metrics in a persistent store like a file or database, in this post we will store the counter metrics in a file in HDFS.
Twitters Elephant bird utility provides methods to make use of hadoop counters with Pig. We need two jars elephant-bird-pig and elephant-bird-hadoop-compat to be placed in projects classpath. These can be downloaded from maven repository. I have added links for downloading these jars below.
Lets take a sample use case where we want to validate a dataset containing users name,phonenumber,mail,occupation,age and gender.
From this dataset we like to understand how many users have not provided their information properly.If any of the fields is blank we will increment a counter accordingly for that field, i.e if user has not provided his age, we will increment a counter called "missing_age" ,if gender is blank then "missing_gender" and others accordingly. We will also validate the values for example age > 150 is invalid data and similarly, gender field values other then Male or Female is invalid. We will capture all these things and store the stats in a hadoop file.
Lets assume this is the format of our dataset.
Structure of Dataset: name,phonenumber,mail,occupation,age,gender
This is our sample dataset
Joe,1122334455,joe@xzy.com,student,20,male
Jim,2211334455,jim@abc.com,engineer,26,male
Lisa,3311224455,lisa@xyz.com,musician,25,female
John,,john@abc.com,writer,120,mafe
In this dataset in the last record we can find that phonenumber of the user is missing and his age provided is wrong and even his gender is wrong.
In the following UDF we have checked if any othe fields i.e name,age,gender etc are missing and also validated if age is not greater than 100 and if gender field has values either male or female. If there are any missing fields then we increment a counter, say if age is missing then we increment a counter "MISSING_AGE" by 1, say if age is invalid then we increment a counter "INVALID_AGE", and similary we do the same for other fields.
In the following code snippet you can notice a constant called COUNTER_GROUP_NAME, in hadoop all COUNTERS are populated under a COUNTER GROUP. Here we create our own customer counter group called CUSTOMCOUNTERS and all our counters are populated under this counter group. I will show this in screenshot after running the pigscript.
package com.dzone.pig;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PigListener implements PigProgressNotificationListener {
private final Logger log = LoggerFactory.getLogger(PigListener.class);
private final String COUNTER_GROUP_NAME = "CUSTOMCOUNTERS";
private HashMap<String, Long> countersToStore = new HashMap<String, Long>(0);
boolean isThereAnyCountersToStore = false;
public void initialPlanNotification(String scriptId, MROperPlan plan) {}
public void launchStartedNotification(String scriptId, int numJobsToLaunch) {}
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {}
public void jobStartedNotification(String scriptId, String assignedJobId) {}
//This method is called after completion of each mapreduce job spawned by pig script
public void jobFinishedNotification(String scriptId, JobStats jobStats) {
Group counterGroup = jobStats.getHadoopCounters().getGroup(COUNTER_GROUP_NAME);
Iterator<Counter> countersInGroup = counterGroup.iterator();
if (counterGroup.size() > 0)
isThereAnyCountersToStore = true;
while (countersInGroup.hasNext()) {
Counter currentCounter = countersInGroup.next();
log.info("CounterName: " + currentCounter.getDisplayName() + " "+ "CounterValue:" + currentCounter.getValue());
countersToStore.put(currentCounter.getDisplayName(),currentCounter.getValue());
}
List<InputStats> inputStatsList = jobStats.getInputs();
if (inputStatsList.size() > 0)
isThereAnyCountersToStore = true;
Iterator<InputStats> inputStatsIterator = inputStatsList.iterator();
while (inputStatsIterator.hasNext()) {
InputStats inputStats = inputStatsIterator.next();
countersToStore.put("TOTAL_INPUT_RECORDS",inputStats.getNumberRecords());
}
}
public void jobFailedNotification(String scriptId, JobStats jobStats) {}
public void outputCompletedNotification(String scriptId,OutputStats outputStats) {}
public void progressUpdatedNotification(String scriptId, int progress) {}
//This method is called once our pigscript completes
public void launchCompletedNotification(String scriptId,int numJobsSucceeded) {
if (isThereAnyCountersToStore)
try {
storeCountersInFile();
} catch (IOException e) {
e.printStackTrace();
} finally {
isThereAnyCountersToStore = false;
}
}
//This method stores the collected counters in file in hdfs.
private void storeCountersInFile() throws IOException {
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path filenamePath = new Path("/tmp/output.txt");
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true);
}
FSDataOutputStream fout = fs.create(filenamePath);
for (Map.Entry<String, Long> entry : countersToStore.entrySet()) {
fout.writeBytes(entry.getKey() + "," + entry.getValue());
fout.writeBytes("\n");
}
} finally {
fs.close();
}
}
}
Lets call our pigscript pigcounter_example.pig
Before running the script dont forget to copy elephant bird jars and our UDF jar which contains both listener and validator into pig library folder. i.e for my installation it is /usr/lib/pig/lib.
Once required jars are copied our pig script can be run using the following command.
pig -Dpig.notification.listener=com.dzone.pig.PigListener -f pigcounter_example.pig
pig.notification.listener property registers listener with our pigscript and calls the methods like jobfinishednotification,launchcompletednotification etc. when our pig job completes.
The following is our pig script.
REGISTER /home/deva/deva/xoanon/dzone/pig/PigCounters-0.0.1-SNAPSHOT.jar;
REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-hadoop-compat-4.5.jar;
REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-pig-4.5.jar;
DEFINE Validator com.dzone.pig.PigCounters();
input_data = LOAD '/tmp/sampledata.csv' USING PigStorage(',') AS(name:chararray,phonenumber:chararray,mail:chararray,occupation:chararray,age:chararray,gender:chararray);
--calling our UDF here to validate and increment counters accordingly
validated_data = FOREACH input_data GENERATE Validator(*) AS validated;
dump validated_data;
Here I have attached screenshot to show you how our counters are displayed in the counters page of our job. Once pig job completed you can see the output file /tmp/output.txt in your hdfs.
Published at DZone with permission of Devarajan Srinivasan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments