Hadoop/R Integration I: Streaming
Join the DZone community and get the full member experience.
Join For FreeIf you've spent any time with MapReduce frameworks in general, by now you probably know the word-count example is the MapReduce equivalent of "Hello World!". In earlier posts, I tried to be slightly different, but with an equally-simple problem - counting up the total dollar value, by state, of new issues of mortgage-backed securities pooled by Fannie Mae.
I have used "straight" Java and Pig so far, and now I'll turn my attention to R. After our example, we'll discuss what makes R unique in this situation, and why a word-count type of example doesn't really do R justice. In advance, I'll mention my main references used here are Joseph Adler's R in a Nutshell (see chapter 26) and Tom White's Hadoop: The Definitive Guide (Chapter 2).
There are a number of ways to use R with Hadoop, including:
- Hadoop streaming, the subject of this post
- RHadoop, an R/Hadoop integration (see the RHadoop Wiki), the subject of a future post
- RHIPE (pronounced hree-pay), another R/Hadoop integration.
Overview
In Hadoop streaming, your mapper, reducer, and optional combiner processes are written to read from standard input and to write to standard output. Once the process scripts and data are ready, you simply invoke Hadoop using its streaming binaries along with some command-line properties.
As in previous posts, I'll be taking data from Fannie Mae's New Issue Pool Statistics (NIPS) files. For more info, see a previous post. I'll be using the same data as in that post, so we can expect an exact match on the results.
The Mapper
NIPS files are a little interesting, in that they contain a number of differently-formatted records (check here for the formats). To perform our analysis, we will be looking at record type 9, "GEOGRAPHIC DISTRIBUTION". We will be interested in columns 3 (state name) and 6 (aggregate unpaid balance). Since numerous record formats are mixed within a single file, we'll first split the file on the pipe delimiters and discard the records that are not of type 9. All we need to do is output the state name and the aggregate unpaid balance, one instance per type-9 line.
To develop my R scripts, I'm using RStudio, an IDE I learned of through Roger Peng's Computing for Data Analysis course on coursera. After an interactive script-building session in RStudio, I produced the following test script:
#! /usr/bin/env Rscript
conn <- file("/home/hduser/fannie-mae-nips/nips_12262012.txt", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
split.line <- strsplit(next.line, "\\|")
if (as.numeric(split.line[[1]][2]) == 9) {
write(paste(split.line[[1]][3],
gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
}
}
close(conn)
which I then invoked from a shell and got the following output, truncated:
CALIFORNIA 167300.00
FLORIDA 395950.00
GEORGIA 69500.00
ILLINOIS 235200.00
MICHIGAN 781950.00
NEW JERSEY 284550.00
OHIO 334175.00
Since this looks clean, I modified the mapper slightly to produce the final version:
#! /usr/bin/env Rscript
conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
split.line <- strsplit(next.line, "\\|")
if (as.numeric(split.line[[1]][2]) == 9) {
write(paste(split.line[[1]][3],
gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
}
}
close(conn)
Note the interesting subscripting to get the results of strsplit: strsplit returns a list, so field 2 of the file record is actually element 2 of the first element of the list, which is a vector of parsed fields. If you need some clarification of this result, see the "Subscripting" chapter from Phil Spector's Data Manipulation with R. Also note the compact usage of gsubto remove the dollar signs and commas from the aggregate unpaid balances.
The Reducer
Our reducer will also be reading from stdin, with the following guaranteed by the Hadoop runtime:
- If a key is encountered by the reducer, then the reducer knows that all records with that key are being sent to this reducer, so it can produce an output with the knowledge that it has been given all the records for that key;
- Incoming records are sorted by key, so the reducer knows that, when a key changes, then all records for the previous key have already been encountered in the stream.
#! /usr/bin/env Rscript
current.key <- NA
current.upb <- 0.0
conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
split.line <- strsplit(next.line, "\t")
key <- split.line[[1]][1]
upb <- as.numeric(split.line[[1]][2])
if (is.na(current.key)) {
current.key <- key
current.upb <- upb
}
else {
if (current.key == key) {
current.upb <- current.upb + upb
}
else {
write(paste(current.key, current.upb, sep="\t"), stdout())
current.key <- key
current.upb <- upb
}
}
}
write(paste(current.key, current.upb, sep="\t"), stdout())
close(conn)
Now, I'd like to test this reducer on a single file, but I have a small issue -- my mapper does not sort the output (it doesn't need to, of course), but my reducer expects the data to be sorted by key. I could just wait and see how the final numbers come out, but since streaming just involves piping stdout to stdin, I'm a little curious about how fast this task could be run outside of Hadoop (I'm not really comparing, for a simple single-node cluster; I'm just curious). And I'm still learning R, so I next write a script to sort the rows by record key:
#! /usr/bin/env Rscript
conn <- file("stdin", open="r")
all.lines <- readLines(conn)
write(sort(all.lines), stdout())
close(conn)
At times like this, I remember why I like R so much! Next, I process a single file with my "test" version of the mapper:
./map.test.R | ./map.output.sorter.R | ./reduce.R
and get output like the following (abbreviated) for a single NIPS file:
ALABAMA 72699735.21
ALASKA 6883209.62
ARIZONA 287482321.1
ARKANSAS 21579003.98
CALIFORNIA 1811342276.77
...
VIRGIN ISLANDS 1021750
WASHINGTON 239648997.97
WEST VIRGINIA 9925894.94
WISCONSIN 72752945.87
WYOMING 6232557.56
Streaming in Hadoop with R
Now that we have a mapper and a reducer, we can process the entire data set in Hadoop. I will process the same set of data as I did in my previous Hadoop-Java-Pig comparison post, meaning NIPS data from 23 August to 26 December 2012. As in that post, I am running Hadoop in pseudo-distributed mode, with the data coming from HDFS. The difference here, of course, is that I am specifying streaming, and providing my mapper and reducer R scripts. I launch from the Hadoop home directory:
bin/hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.1.0.jar -input /user/hduser/fannie-mae-nips -output /user/hduser/fannie-mae-nips-r-output -mapper /home/hduser/RScripts/map.R -reducer /home/hduser/RScripts/reduce.R
So, what did I get for my efforts? Copying my results file from HDFS:
bin/hadoop dfs -copyToLocal /user/hduser/fannie-mae-nips-r-output/part-00000 rResults.txt
yields the following output (abbreviated here):
ALABAMA 3242681838.89999
ALASKA 841797447.200001
ARIZONA 9340767235.06001
ARKANSAS 1452136751.9
CALIFORNIA 89114642822.0799
...
VERMONT 553060435.67
VIRGIN ISLANDS 33604327.46
VIRGINIA 12706719836.48
WASHINGTON 13194248475.54
WEST VIRGINIA 486889587.57
WISCONSIN 8140391871.79
WYOMING 720905726.84
I still have the outputs from my previous post on this same data set, using Java and Pig; perusing this output shows the following output (note I did not diff the files because the numbers were output in a different format):
ALABAMA 3.242681838899994E9
ALASKA 8.417974472000003E8
ARIZONA 9.340767235060005E9
ARKANSAS 1.452136751900001E9
CALIFORNIA 8.91146428220799E10
....
VERMONT 5.530604356700001E8
VIRGIN ISLANDS 3.360432746000001E7
VIRGINIA 1.2706719836479996E10
WASHINGTON 1.319424847554002E10
WEST VIRGINIA 4.868895875700002E8
WISCONSIN 8.140391871790002E9
WYOMING 7.209057268400007E8
So, I successfully duplicated the Java and Pig examples using R and Hadoop streaming.
Final Comments about Hadoop and R
If you are at all familiar with R, you understand that R isn't a language you pick up just to split lines of output and sum numbers; the language and its libraries contain a wealth of functionality. The point of this post was primarily to work through the mechanical details of using R with Hadoop streaming. Where R would really shine is if we had some "heavy lifting" to do with R that was easily decomposable into map-style and reduce-style tasks. For example, if you were fitting a linear regression against a huge data set, using a large number of variables, or if you were performing a Shapiro-Wilk test against a large data set, the ability to split up the job into parallel tasks, combining them at the end with a reducer, would be a great example of Hadoop/R synergy. For more information on parallel computation in R, see chapter 26 of Joseph Adler's R in a Nutshell, especially his "Where to Learn More" section at the end of the chapter.
Published at DZone with permission of Wayne Adams, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments