MapReduce Algorithms – Secondary Sorting
Join the DZone community and get the full member experience.
Join For FreeValue to Key Conversion
Creating a composite key is straight forward. What we need to do is analyze what part(s) of the value we want to account for during the sort and add the appropriate part(s) to the natural key. Then we need to work on the compareTo method either in key class, or comparator class to make sure the composite key is accounted. We will be re-visiting the weather data set and include the temperature as part of the natural key (the natural key being the year and month concatenated together). The result will be a listing of the coldest day for a given month and year. This example was inspired from the secondary sorting example found in Hadoop, The Definitive Guide book. While there are probably better ways to achieve this objective, but it will be good enough to demonstrate how secondary sorting works.
Mapper Code
Our mapper code already concatenates the year and month together, but we will also include the temperature as part of the key. Since we have included the value in the key itself, the mapper will emit a NullWritable, where in other cases we would emit the temperature.
public class SecondarySortingTemperatureMapper extends Mapper<LongWritable, Text, TemperaturePair, NullWritable> { private TemperaturePair temperaturePair = new TemperaturePair(); private NullWritable nullValue = NullWritable.get(); private static final int MISSING = 9999; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring(15, 21); int tempStartPosition = 87; if (line.charAt(tempStartPosition) == '+') { tempStartPosition += 1; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92)); if (temp != MISSING) { temperaturePair.setYearMonth(yearMonth); temperaturePair.setTemperature(temp); context.write(temperaturePair, nullValue); } } }
Now we have added the temperature to the key, we set the stage for enabling secondary sorting. What’s left to do is write code taking temperature into account when necessary. Here we have two choices, write a Comparator or adjust the compareTo method on the TemperaturePair class (TemperaturePair implements WritableComparable). In most cases I would recommend writing a separate Comparator, but the TemperaturePair class was written specifically to demonstrate secondary sorting, so we will modify the TemperaturePair class compareTo method.
@Override public int compareTo(TemperaturePair temperaturePair) { int compareValue = this.yearMonth.compareTo(temperaturePair.getYearMonth()); if (compareValue == 0) { compareValue = temperature.compareTo(temperaturePair.getTemperature()); } return compareValue; }
If we wanted to sort in descending order, we could simply multiply the result of the temperature comparison by a -1.
Now that we have completed the part necessary for sorting, we need to write a custom partitioner.
Partitoner Code
To ensure only the natural key is considered when determining which reducer to send the data to, we need to write a custom partitioner. The code is straight forward and only considers the yearMonth value of the TemperaturePair class when calculating the reducer the data will be sent to.
public class TemperaturePartitioner extends Partitioner<TemperaturePair, NullWritable>{ @Override public int getPartition(TemperaturePair temperaturePair, NullWritable nullWritable, int numPartitions) { return temperaturePair.getYearMonth().hashCode() % numPartitions; } }
While the custom partitioner guarantees that all of the data for the year and month arrive at the same reducer, we still need to account for the fact the reducer will group records by key.
Grouping Comparator
Once the data reaches a reducer, all data is grouped by key. Since we have a composite key, we need to make sure records are grouped solely by the natural key. This is accomplished by writing a custom GroupPartitioner. We have a Comparator object only considering the yearMonth field of the TemperaturePair class for the purposes of grouping the records together.
public class YearMonthGroupingComparator extends WritableComparator { public YearMonthGroupingComparator() { super(TemperaturePair.class, true); } @Override public int compare(WritableComparable tp1, WritableComparable tp2) { TemperaturePair temperaturePair = (TemperaturePair) tp1; TemperaturePair temperaturePair2 = (TemperaturePair) tp2; return temperaturePair.getYearMonth().compareTo(temperaturePair2.getYearMonth()); } }
Results
Here are the results of running our secondary sort job:
new-host-2:sbin bbejeck$ hdfs dfs -cat secondary-sort/part-r-00000 190101 -206 190102 -333 190103 -272 190104 -61 190105 -33 190106 44 190107 72 190108 44 190109 17 190110 -33 190111 -217 190112 -300
Conclusion
While sorting data by value may not be a common need, it’s a nice tool to have in your back pocket when needed. Also, we have been able to take a deeper look at the inner workings of Hadoop by working with custom partitioners and group partitioners.
Published at DZone with permission of Bill Bejeck, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments