Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Processing Common Serialization Formats

DZone's Guide to

Processing Common Serialization Formats

· Big Data Zone
Free Resource

Need to build an application around your data? Learn more about dataflow programming for rapid development and greater creativity. 

Processing Common Serialization Formats

By Alex Holmes, author of Hadoop in Practice

MapReduce offers straightforward, well-documented support for working with simple data formats such as log files. But the use of MapReduce has evolved beyond log files to more sophisticated data serialization formats—such as text, XML, and JSON—to the point that its documentation and built-in support runs dry. In this article, based on chapter 3 of Hadoop in Practice, author Alex Holmes shows you how to work with file formats such as XML.

XML and JSON are industry-standard data interchange formats. Their ubiquity in the technology industry is evidenced by their heavy adoption in data storage and exchange.

XML

XML has existed since 1998 as a mechanism to represent data that's readable by machine and human alike. It became a universal language for data exchange between systems. It's employed by many standards today such as SOAP and RSS, and used as an open data format for products such as Microsoft Office.

Technique: MapReduce and XML

While MapReduce comes bundled with an InputFormat that works with text, it doesn't come with one that supports XML. Working on a single XML file in parallel in MapReduce is tricky because XML doesn't contain a synchronization marker in its data format.

Problem

You want to work with large XML files in MapReduce and be able to split and process them in parallel.

Solution

Mahout's XMLInputFormat can be used to work with XML files in HDFS with MapReduce. It reads records that are delimited by a specific XML begin and end tag. This technique also covers how XML can be emitted as output in MapReduce output.

Discussion

MapReduce doesn't contain built-in support for XML, so we'll turn to another Apache project, Mahout, a machine learning system, to provide an XML InputFormat. To showcase the XML InputFormat, let's write a MapReduce job that uses Mahout's XML InputFormat to read property names and values from Hadoop's configuration files. The first step will be to set up the job configuration:

conf.set("xmlinput.start", ""); #A

conf.set("xmlinput.end", ""); #B
	
job.setInputFormatClass(XmlInputFormat.class); #C

#A Your job is taking Hadoop config files as input, where each configuration entry uses the property tag.
#B Define the string form of the XML end tag.
#C Set the Mahout XML input format class.

Looking at the previous code, it quickly becomes apparent that Mahout's XML Input-Format is rudimentary; you need to tell it an exact sequence of start and end XML tags that will be searched in the file. Looking at the source of the InputFormat confirms this:[1]

private boolean next(LongWritable key, Text value)
    throws IOException {
  if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
    try {
      buffer.write(startTag);
      if (readUntilMatch(endTag, true)) {
        key.set(fsin.getPos());
        value.set(buffer.getData(), 0, buffer.getLength());
        return true;
      }
    } finally {
      buffer.reset();
    }
  }
  return false;
}

Next you need to write a mapper to consume Mahout's XML InputFormat. The XML element in Text form has been supplied, so you'll need to use an XML parser to extract content from the XML.

Listing 1 A mapper to work with XML

public static class Map extends Mapper {

  @Override
  protected void map(LongWritable key, Text value, 
                     Mapper.Context context)
      throws
      IOException, InterruptedException { 
    String document = value.toString(); 
    System.out.println("'" + document + "'"); 
    try {
      XMLStreamReader reader = 
          XMLInputFactory.newInstance().createXMLStreamReader(new
              ByteArrayInputStream(document.getBytes())); 
          String propertyName = ";
          String propertyValue = "; 
          String currentElement = "; 
          while (reader.hasNext()) {
            int code = reader.next();
            switch (code) {
              case START_ELEMENT:
                currentElement = reader.getLocalName();
            break;
          case CHARACTERS:
            if (currentElement.equalsIgnoreCase("name")) {
              propertyName += reader.getText();
            } else if (currentElement.equalsIgnoreCase("value")) {
              propertyValue += reader.getText();
            }
            break;
        }
      }
      reader.close();
      context.write(propertyName.trim(), propertyValue.trim());
    } catch (Exception e) {
      log.error("Error processing '" + document + "'", e);
    }
  }
}

The map is given a Text instance, which contains a String representation of the data between the start and end tags. In this code you use Java's built-in Streaming API for XML (StAX) parser to extract the key and value for each property and output them. If you run the MapReduce job against Cloudera's core-site.xml and use the HDFS cat command to show the output, you'll see the following output:

$ hadoop fs -put $HADOOP_HOME/conf/core-site.xml core-site.xml

$ bin/run.sh com.manning.hip.ch3.xml.HadoopPropertyXMLMapReduce \
  core-site.xml output

$ hadoop fs -cat output/part*
fs.default.name hdfs://localhost:8020
hadoop.tmp.dir /var/lib/hadoop-0.20/cache/${user.name} hadoop.proxyuser.oozie.hosts * hadoop.proxyuser.oozie.groups *

This output shows that you've successfully worked with XML as an input serialization format with MapReduce. Not only that—you can support huge XML files since the InputFormat supports splitting XML.

Writing xml

Having successfully read XML, the next question is how do you write XML? In your reducer you have callbacks that occur before and after your main reduce method is called, which you can use to emit a start and end tag, as shown in the following example.[2] A reducer to emit start and end tags

Listing 2 A reducer to emit start and end tags

public static class Reduce
    extends Reducer {

  @Override
  protected void setup( 
      Context context)
      throws IOException, InterruptedException {
    context.write(new Text(""), null);  #A
  }

  @Override
  protected void cleanup( 
      Context context)
      throws IOException, InterruptedException {
    context.write(new Text(""), null);  #B
  }

  private Text outputKey = new Text();
  public void reduce(Text key, Iterable values, 
                     Context context)
      throws IOException, InterruptedException {
    for (Text value : values) {
      outputKey.set(constructPropertyXml(key, value));  #C

      context.write(outputKey, null);  #D
    }
  }

  public static String constructPropertyXml(Text name, Text value) { 
    StringBuilder sb = new StringBuilder(); 
    sb.append("").append(name)
        .append("").append(value)
        .append("");
    return sb.toString();
  }
}

#A Use the setup method to write the root element start tag.
#B Use the cleanup method to write the root element end tag.
#C Construct a child XML element for each key/value combination provided in the reducer.
#D Emit the XML element.

This could also be embedded in an OutputFormat, but I'll leave that as an exercise for the reader.

Pig

If you want to work with XML in Pig, the Piggybank library (a user-contributed library of useful Pig code, detailed in chapter 10) contains an XMLLoader. It works in a way similar to this technique and captures all of the content between a start and end tag, supplying it as a single byte array field in a Pig tuple.

Hive

Currently, no means exists for working with XML in Hive. You would have to write a custom SerDe,[3].

Summary

Mahout's XML InputFormat certainly helps you work with XML. But it's sensitive to an exact string match of both the start and end element names. If the element tag can contain attributes with variable values, or the generation of the element can't be controlled and could result in XML namespace qualifiers being used, then this approach may not work for you. Also problematic will be situations where the element name you specify is used as a descendant child element.

If you have control over the XML laid out in the input, this exercise can be simplified by having a single XML element per line. This will let you use the built-in MapReduce text-based InputFormats (such as TextInputFormat), which treat each line as a record and split accordingly to preserve that demarcation.

Another option worth considering is that of a preprocessing step, where you could convert the original XML into a separate line per XML element, or convert it into an altogether different data format such as a SequenceFile or Avro, both of which solve the splitting problem for you.

A streaming class called StreamXmlRecordReader also allows you to work with XML in your streaming code.

Now that you have a handle on how to work with XML, let's tackle another popular serialization format, JSON.

JSON

JSON shares the machineand human-readable traits of XML, and has existed since the early 2000s. It's less verbose than XML, and doesn't have the rich typing and validation features available in XML.

Technique: MapReduce and JSON

Imagine you have some code that's downloading JSON data from a streaming REST service and every hour writes a file into HDFS. The data amount that's being downloaded is large, so each file being produced is multiple gigabytes in size.

You've been asked to write a MapReduce job that can take as input these large JSON files. What you have here is a problem in two parts: first, MapReduce doesn't come with an InputFormat that works with JSON. Second, how does one even go about splitting JSON? Figure 1 shows the problem with splitting JSON. To split files, given a random offset in a file, you'll need to be able to determine the start of the next JSON element. This is made more challenging when working with JSON because it's a hierarchical data format and the same element name can be used in multiple levels, as shown in the figure.

JSON is harder to partition into distinct segments than a format such as XML because JSON doesn't have a token (like an end tag in XML) to denote the start or end of a record.

Figure 1 Example of issue with JSON and multiple input splits

Problem

You want to work with JSON inputs in MapReduce and also ensure that input JSON files can be partitioned for concurrent reads.

Solution

The Elephant Bird LzoJsonInputFormat input format is used as a basis to create an input format class to work with JSON elements. This technique also covers another approach using my open source project that can work with multiline JSON.

Discussion

Elephant Bird,[4] an open source project that contains some useful utilities for working with LZOP compression, has an LzoJsonInputFormat that can read JSON, though it requires that the input file be LZOP-compressed. You'll use the Elephant Bird code as a template for your own JSON InputFormat, which doesn't have the LZOP compression requirement.

We're cheating with this solution, which assumes that each JSON record is on a separate line. My JsonInputFormat is simple and does nothing other than construct and return a JsonRecordReader, so we'll skip over that code. The JsonRecordReader emits LongWritable, MapWritable key/value pairs to the mapper, where the MapWritable is a map of JSON element names and their values. Let's take a look at how this RecordReader works. It leverages the LineRecordReader, which is a built-in MapReduce reader that emits a record for each line. To convert the line to a MapWritable, the reader uses the following method:[5]

public static boolean decodeLineToJson(JSONParser parser, Text line, 
                                       MapWritable value) {
  try {
    JSONObject jsonObj = (JSONObject)parser.parse(line.toString());
    for (Object key: jsonObj.keySet()) {
      Text mapKey = new Text(key.toString()); 
      Text mapValue = new Text();
      if (jsonObj.get(key) != null) {
        mapValue.set(jsonObj.get(key).toString());
      }

      value.put(mapKey, mapValue);
    }
    return true;
  } catch (ParseException e) {
    LOG.warn("Could not json-decode string: " + line, e);
    return false;
  } catch (NumberFormatException e) {
    LOG.warn("Could not parse field into number: " + line, e);
    return false;
  }
}

The reader uses the json-simple[6] parser to parse the line into a JSON object, and then iterates over the keys and puts the keys and values into a MapWritable. The mapper is given the JSON data in LongWritable, MapWritable pairs and can process the data accordingly. You can view this basic code for the MapReduce job in the GitHub repository.

I'll demonstrate this technique using the following JSON:

{
  "results" : 
    [
      {
        "created_at" : "Thu, 29 Dec 2011 21:46:01 +0000", 
        "from_user" : "grep_alex",
        "text" : "RT @kevinweil: After a lot of hard work by ..."
      },
      {
        "created_at" : "Mon, 26 Dec 2011 21:18:37 +0000", 
        "from_user" : "grep_alex",
        "text" : "@miguno pull request has been merged, thanks again!"
      }
    ]
}

Because this technique assumes a JSON object per line, the following shows the JSON file you'll work with:

{"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ...
{"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...

Now copy the JSON file into HDFS and run your MapReduce code. The MapReduce code writes each JSON key/value as the job output:

$ hadoop fs -put test-data/ch3/singleline-tweets.json \
  singleline-tweets.json

$ bin/run.sh com.manning.hip.ch3.json.JsonMapReduce \
  singleline-tweets.json output

$ hadoop fs -cat output/part*
text		RT @kevinweil: After a lot of hard work by ... 
from_user	grep_alex
created_at	Thu, 29 Dec 2011 21:46:01 +0000
text		@miguno pull request has been merged, thanks again!
from_user	grep_alex
created_at	Mon, 26 Dec 2011 21:18:37 +0000

Writing JSON

An approach similar to writing XML could also be used to write JSON.

Pig

Elephant Bird contains a JsonLoader and an LzoJsonLoader, which you can use to work with JSON in Pig. These loaders work with line-based JSON. Each Pig tuple contains a chararray field for each JSON element in the line.

Hive

Hive contains a DelimitedJSONSerDe, which can serialize JSON, but, unfortunately, not deserialize it, so you can't load data into Hive using this SerDe.

Summary

This solution assumes that the JSON input is structured with a line per JSON object. How would you work with JSON objects that are across multiple lines? An experimental project on GitHub[7] works with multiple input splits over a single JSON file. This approach searches for a specific JSON member and retrieves the containing object.

You can also review a Google Code project called hive-json-serde,[8] which can support both serialization and deserialization.

Conclusion

As you can see, using XML and JSON in MapReduce is kludgy and has rigid requirements about how to lay out your data. Support for these two formats in MapReduce is also complex and error prone, since neither lends itself naturally to splitting. Clearly you need to look at alternative file formats that have built-in support for splittability.


1. GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/manning/hip/ch3/xml/XmlInputFormat.java

2. GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/manning/hip/ch3/xml/SimpleXmlOutputMapReduce.java>↩

3. SerDe is a shortened form of Serializer/Deserializer, and is the mechanism that allows Hive to read and write data in HDFS. 

4. See https://github.com/kevinweil/elephant-bird. 

5. GitHub source—https://github.com/alexholmes/hadoop-book/blob/master/src/main/java/com/manning/hip/ch3/json/JsonInputFormat.java 

6. See http://code.google.com/p/json-simple/. 

7. A multiline JSON InputFormat: https://github.com/alexholmes/json-mapreduce. 

8. See http://code.google.com/p/hive-json-serde/. 

Here are some other Manning titles you might be interested in:

Hadoop in Action

Hadoop in Action
Chuck Lam

Mahout in Action

Mahout in Action
Sean Owen, Robin Anil, Ted Dunning, and Ellen Friedman

Tika in Action

Tika in Action
Chris A. Mattmann and Jukka L. Zitting

Save 50% on Hadoop in PracticeHBase in Action, and Pig in Action  with promo code dzwkd2 only at manning.com. Offer expires July 31th midnight.

Check out the Exaptive data application Studio. Technology agnostic. No glue code. Use what you know and rely on the community for what you don't. Try the community version.

Topics:

Opinions expressed by DZone contributors are their own.

THE DZONE NEWSLETTER

Dev Resources & Solutions Straight to Your Inbox

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.

X

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}