# In-mapper Combiner Program to Calculate Average

In my previous post I described how we can use in-mapper combiner to make our M/R program more efficient. In the post, we also saw both M/R algorithm for average calculation with/without using in-mapper combiner optimization.

In this post I am posting codes for both the algorithm.

The M/R program to calculate average without in-mapper combiner is given below:

```package com.hadoop.imcdp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class AvgDriver extends Configured implements Tool{

public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

String record;

protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
record = value.toString();
String[] fields = record.split(",");

Integer s_id = new Integer(fields[0]);
Integer marks = new Integer(fields[2]);
context.write(new IntWritable(s_id), new IntWritable(marks));
} // end of map method
} // end of mapper class

public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> {

protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
Integer s_id = key.get();
Integer sum = 0;
Integer cnt = 0;

for (IntWritable value:values) {
sum = sum + value.get();
cnt = cnt + 1;
}

Float avg_m = (float) (sum/cnt);
context.write(new IntWritable(s_id), new FloatWritable(avg_m));
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];

Job job = new Job(conf, "Avg");
job.setJarByClass(ImcdpMap.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ImcdpMap.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(ImcdpReduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);

FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);

job.waitForCompletion(true);
return (job.waitForCompletion(true) ? 0 : 1);
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvgDriver(), args);
System.exit(exitCode);
}
}
```

The M/R program to calculate average with in-mapper combiner is given below:

```package com.hadoop.imcdp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class ImcdpAvgDriver extends Configured implements Tool{

public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntPair> {

String record;
Map partial_sum = new HashMap<Integer, Integer>();
Map record_count = new HashMap<Integer, Integer>();

protected void map(LongWritable key, Text value, Mapper.Context context) {
record = value.toString();
String[] fields = record.split(",");

Integer s_id = new Integer(fields[0]);
Integer marks = new Integer(fields[2]);

if (partial_sum.containsKey(s_id)) {
Integer sum = (Integer) partial_sum.get(s_id) + marks;
partial_sum.put(s_id, sum);
} else {
partial_sum.put(s_id, marks);
}

if (record_count.containsKey(s_id)) {
Integer count = (Integer) record_count.get(s_id) + 1;
record_count.put(s_id, count);
} else {
record_count.put(s_id, 1);
}
} // end of map method

protected void cleanup(Context context) throws IOException, InterruptedException {
Iterator<Map.Entry<Integer, Integer>> itr1 = partial_sum.entrySet().iterator();

while (itr1.hasNext()) {
Entry<Integer, Integer> entry1 = itr1.next();
Set record_count_set = record_count.entrySet();
Integer s_id_1 = entry1.getKey();
Integer partial_sum_1 = entry1.getValue();
Integer record_count_1 = (Integer) record_count.get(s_id_1);

context.write(new IntWritable(s_id_1), new IntPair(partial_sum_1, record_count_1));
System.out.println(s_id_1+","+partial_sum_1+","+record_count_1);
}
} // end of cleanup
} // end of mapper class

public static class ImcdpReduce extends Reducer<IntWritable, IntPair, IntWritable, FloatWritable> {

protected void reduce(IntWritable key, Iterable<IntPair> values, Reducer<IntWritable, IntPair, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
Integer s_id = key.get();
Integer sum = 0;
Integer cnt = 0;
System.out.println(key+","+values);
for (IntPair value:values) {
sum = sum + value.getFirstInt();
cnt = cnt + value.getSecondInt();
}

System.out.println(sum+","+cnt);
Float avg_m = (float) (sum/cnt);
context.write(new IntWritable(s_id), new FloatWritable(avg_m));
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];

Job job = new Job(conf, "IMCDP");
job.setJarByClass(ImcdpMap.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ImcdpMap.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntPair.class);

job.setReducerClass(ImcdpReduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);

FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);

job.waitForCompletion(true);
return (job.waitForCompletion(true) ? 0 : 1);
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ImcdpAvgDriver(), args);
System.exit(exitCode);
}
}```

The programs took 56sec and 42sec respectively for execution on my laptop for 10 million records. So we can see a 33% improvement in time while using in-mapper combiner program.

