Counters in Apache Pig

DZone 's Guide to

Counters in Apache Pig

Counters in Hadoop are lightweight objects helping us in tracking the progress of Map and reduce tasks.

· Big Data Zone ·
Free Resource

When dealing with enormous amount of raw data daily, there might be scenarios where we might be interested in gathering information about the data that we are analyzing, like how many types of records were processed, how many invalid records were found while running the job, etc. This functionality is provided by Counters in Hadoop, which are lightweight objects helping us in tracking the progress of Map and reduce tasks. Since Apache Pig is a widely used for ETL flows in organisations and  Apache Pig generates mapreduce jobs underneath we can make use of these hadoop counters in Apache Pig to suit our needs.  In this post lets see how we can gather information about the data we want to deal with and leverage hadoop counters functionality in Pig and store these counter metrics in a persistent store like a file,database etc., so that this information can later be viewed upon to understand more about our data.   

In order to make use of hadoop counters, we need to write pigudf and perform all the related operations within the udf. Finally once all validations are over we can store these metrics in a persistent store like a file or database, in this post we will store the counter metrics in a file in HDFS.

Twitters Elephant bird utility provides methods to make use of hadoop counters with Pig. We  need two jars elephant-bird-pig and elephant-bird-hadoop-compat to be placed in projects classpath. These can be downloaded from maven repository. I have added links for downloading these jars below.

elephantbird jar download 

Lets take a sample use case where we want to validate a dataset containing users name,phonenumber,mail,occupation,age and gender.

From this dataset we like to understand how many users have not provided their information properly.If any of the fields is blank we will increment a counter accordingly for that field, i.e if user has not provided his age, we will increment a counter called "missing_age" ,if gender is blank then "missing_gender" and others accordingly. We will also validate the values for example age > 150 is invalid data and similarly, gender field values other then Male or Female is invalid. We will capture all these things and store the stats in a hadoop file.  

Lets assume this is the format of our dataset.

Structure of Dataset: name,phonenumber,mail,occupation,age,gender

This is our sample dataset


In this dataset in the last record we can find that phonenumber of the user is missing and his age provided is wrong and even his gender  is wrong.

In the following UDF we have checked if any othe fields i.e name,age,gender etc are missing and also validated if age is not greater than 100 and if gender field has values either male or female. If there are any missing fields then we increment a counter, say if age is missing then we increment a counter "MISSING_AGE" by 1, say if age is invalid then we increment a counter "INVALID_AGE", and similary we do the same for other fields.

In the following code snippet you can notice a constant called COUNTER_GROUP_NAME, in hadoop all COUNTERS are populated under a COUNTER GROUP. Here we create our own customer counter group called CUSTOMCOUNTERS and all our counters are populated under this counter group. I will show this in screenshot after running the pigscript.

The above UDF needs to be compile and package as a jar and REGISTERED with our pigscript.

package com.dzone.pig;

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import com.twitter.elephantbird.pig.util.PigCounterHelper;

public class PigCounters extends EvalFunc<String> {

PigCounterHelper counterHelper = new PigCounterHelper();
private final long incrValue = 1;
private final String COUNTERDELIMITER = "_";

public static enum COUNTERS {

public PigCounters() {};
public void incrementCounters(String counterName) {
counterHelper.incrCounter(COUNTER_GROUP_NAME, counterName, incrValue);

public String exec(Tuple input) throws IOException {
String name = (String) input.get(0);
String phone = (String) input.get(1);
String mail = (String) input.get(2);
String occupation = (String) input.get(3);
String age = (String) input.get(4);
String gender = (String) input.get(5);

if (name == null || name.isEmpty()) {
if (phone == null || phone.isEmpty()) {
} else if (phone.length() != 10)
if (mail == null || mail.isEmpty()) {
if (occupation == null || occupation.isEmpty()) {
if (age == null || age.isEmpty()) {
} else if (Integer.parseInt(age) > 100)
if (gender == null || gender.isEmpty()) {
} else if (!(gender.equalsIgnoreCase("male") || (gender
return "validated";

 With the help of the above UDF we validate our data and increment counters. These counters are  will be visible in the COUNTERS  page for our job under the group name "CUSTOMCOUNTERS".But our goal is to store the counters in hadoop file.

For that we need to write a PIGLISTENER which will listen to our pig job and store the collected counters once pig job ends.

For this Apache Pig provides an interface called PIGPROCESSLISTENER, which listens to events like when our pig job starts, ends, submitted and calls those methods appropriately. You can understand more about these by taking a look at the following snippet.

In the following snippet we collect all counter under group "CUSTOMCOUNTERS" and store them in hadoop file.

package com.dzone.pig;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PigListener implements PigProgressNotificationListener {

private final Logger log = LoggerFactory.getLogger(PigListener.class);
private HashMap<String, Long> countersToStore = new HashMap<String, Long>(0);
boolean isThereAnyCountersToStore = false;

public void initialPlanNotification(String scriptId, MROperPlan plan) {}
public void launchStartedNotification(String scriptId, int numJobsToLaunch) {}
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {}
public void jobStartedNotification(String scriptId, String assignedJobId) {}
//This method is called after completion of each mapreduce job spawned by pig script
public void jobFinishedNotification(String scriptId, JobStats jobStats) {
Group counterGroup = jobStats.getHadoopCounters().getGroup(COUNTER_GROUP_NAME);
Iterator<Counter> countersInGroup = counterGroup.iterator();
if (counterGroup.size() > 0)
isThereAnyCountersToStore = true;
while (countersInGroup.hasNext()) {
Counter currentCounter = countersInGroup.next();
log.info("CounterName: " + currentCounter.getDisplayName() + "  "+ "CounterValue:" + currentCounter.getValue());
List<InputStats> inputStatsList = jobStats.getInputs();
if (inputStatsList.size() > 0)
isThereAnyCountersToStore = true;
Iterator<InputStats> inputStatsIterator = inputStatsList.iterator();
while (inputStatsIterator.hasNext()) {
InputStats inputStats = inputStatsIterator.next();
public void jobFailedNotification(String scriptId, JobStats jobStats) {}
public void outputCompletedNotification(String scriptId,OutputStats outputStats) {}
public void progressUpdatedNotification(String scriptId, int progress) {}
//This method is called once our pigscript completes
public void launchCompletedNotification(String scriptId,int numJobsSucceeded) {
if (isThereAnyCountersToStore)
try {
} catch (IOException e) {
} finally {
isThereAnyCountersToStore = false;
//This method stores the collected counters  in  file in hdfs.
private void storeCountersInFile() throws IOException {
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path filenamePath = new Path("/tmp/output.txt");
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true);
FSDataOutputStream fout = fs.create(filenamePath);
for (Map.Entry<String, Long> entry : countersToStore.entrySet()) {
fout.writeBytes(entry.getKey() + "," + entry.getValue());
} finally {

Lets call our pigscript pigcounter_example.pig

Before running the script dont forget to copy elephant bird jars and our UDF jar which contains both listener and validator into pig library folder. i.e for my installation it is /usr/lib/pig/lib.

Once required jars are copied our pig script can be run using the following command.

pig -Dpig.notification.listener=com.dzone.pig.PigListener -f pigcounter_example.pig

pig.notification.listener property registers listener with our pigscript and calls the methods like jobfinishednotification,launchcompletednotification etc. when our pig job completes.

 The following is our pig script.

  REGISTER /home/deva/deva/xoanon/dzone/pig/PigCounters-0.0.1-SNAPSHOT.jar;
  REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-hadoop-compat-4.5.jar;
  REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-pig-4.5.jar;
  DEFINE Validator com.dzone.pig.PigCounters();

  input_data  = LOAD '/tmp/sampledata.csv' USING PigStorage(',') AS(name:chararray,phonenumber:chararray,mail:chararray,occupation:chararray,age:chararray,gender:chararray);

  --calling our UDF here to validate and increment counters accordingly

  validated_data = FOREACH input_data GENERATE Validator(*) AS validated;
  dump validated_data;

Here I have attached screenshot to show you how our counters are displayed in the counters page of our job. Once pig job completed you can see the output file /tmp/output.txt in your hdfs.

apache pig ,big data ,bigdata ,counters ,etl ,hadoop ,hdfs

Published at DZone with permission of Devarajan Srinivasan . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}