Over a million developers have joined DZone.

Secondary Sorting with Avro

DZone's Guide to

Secondary Sorting with Avro

· Big Data Zone ·
Free Resource

Cloudera Data Flow, the answer to all your real-time streaming data problems. Manage your data from edge to enterprise with a no-code approach to developing sophisticated streaming applications easily. Learn more today.

In the last Avro sorting post you saw how sorting Avro records works in MapReduce, and how one can ignore fields in Avro records for partitioning, sorting and grouping. In the process you discovered that ignored fields are limited by being immutable (since they can only be defined once for a schema), which means you can’t vary what fields are ignored for partitioning, sorting or grouping, which is key for secondary sort.

If you wish to use secondary sort with Avro, one option would be to emit a custom Writable as the map output key, and emit an Avro record as the map output value. With this approach you’d write a custom partitioner, and sorting/grouping implementation.

This post looks at another option, where with some hacking you can actually have secondary sort with Avro map output keys.

True secondary sort with an AvroKey

Avro has some utility classes for sorting and hashing (required for the partitioner), but the code is locked-down with private methods. The hacking therefore requires lifting certain parts of Avro’s code, and writing some helper functions to easily allow jobs fine-grained control over what fields are used for secondary sort.

Let’s take an example with the same Avro schema we used in the last post:

{"type": "record", "name": "com.alexholmes.avro.WeatherNoIgnore",
 "doc": "A weather reading.",
 "fields": [
     {"name": "station", "type": "string"},
     {"name": "time", "type": "long"},
     {"name": "temp", "type": "int"},
     {"name": "counter", "type": "int", "default": 0}

For secondary sort you may imagine a scenario where you want to partition output records by the station, sort records using the station, time and temp fields, and finally group by the station and time fields. The code to do this is as follows GitHub source:

    .addPartitionField(WeatherNoIgnore.SCHEMA$, "station", true)
    .addSortField(WeatherNoIgnore.SCHEMA$, "station", true)
    .addSortField(WeatherNoIgnore.SCHEMA$, "time", true)
    .addSortField(WeatherNoIgnore.SCHEMA$, "temp", true)
    .addGroupField(WeatherNoIgnore.SCHEMA$, "station", true)
    .addGroupField(WeatherNoIgnore.SCHEMA$, "time", true)

The ordering of the addXXX calls is significant, as it determines the order in which fields are used for sorting and grouping. The last argument in the addXXX methods is a boolean which indicates whether the ordering is ascending.

Most of the heavy lifting is performed in the AvroSort and AvroDataHack - the latter, as its name indicates, is where some hacking took place to get things working.

The only caveat with the current implementation is that Avro union types aren’t currently supported - I’ll look into that in the near future.

 Cloudera Enterprise Data Hub. One platform, many applications. Start today.


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}