MapReduce with R on Hadoop and Amazon EMR
You
all know why MapReduce is fancy – so let’s just jump right in. I like
researching data and I like to see results fast – does that mean I enjoy
the process of setting up a Hadoop cluster? No, I doubt there is any
correlation – neither causal nor merely statistical. The good news is
there are already quite a lot of cloud computing providers offering
Hadoop clusters on demand! For this article I got my hands on Amazon’s Elastic MapReduce
(EMR) service (which is an extension of its EC2 service) that sets up
the Hadoop cluster for you. Okay – almost at least. For this article we
are going to count 2-grams in (dummy text) data using the stringdist library.
Setting Up an Amazon Web Service Account
First of all you need an account for Amazon Web Service (AWS). Well, that’s straightforward. I recommend checking out their Getting Started with AWS area.
They need the credit card because as soon as you have the account you
can do things that might cost them money. But their pricing model is
very transparent and they even offer an “AWS Free Usage Tier” that
allows you to play around with EC2 and other services for free – f.x.
with their smallest instance, the t1.micro. EMR though runs on at least
t1.small instances – and we are even going to use a t1.medium – so it will cost a few cents. And “few cents” is meant literally in this case.
In the course of setting up your account it is important that you also create access keys.
You aren’t going to use those yourself (for the purpose of the
described example) but the EMR job is going to fail prematurely
otherwise. To do so you enter the “Security Credentials” section of your
AWS console and there – exactly – you create frewh access keys.
The Big Data and Amazon S3
The data is two files (“input1″ and
“input2″) and consists of four tab-separated columns. First column is
some sort of an ID, second column a country code, third column holds the
initials/name of the text’s author and the fourth column keeps the text
we are going to have the 2-grams counted for.
content of file "input1": 2354 de A.B. [deutscher Text] 2355 en B.C. [english quote] 2356 de C.D. [deutscher Text] 2357 de D.E. [deutscher Text] 2358 en E.F. [english text] content of file "input2": 8574 de F.G. [deutscher Text] 8575 en G.H. [english text] 8576 de H.I. [deutscher Text] 8577 de I.J. [deutsches Zitat] 8578 en J.K. [english text]
Hadoop can be fed data either from disk or from HDFS.
In case of Amazon EMR though we will have to upload those two files to
Amazon’s Simple Storage Service (S3). For that purpose you enter the S3 section
of your AWS console and create a bucket. The bucket needs a globally
(also this word is meant literally) unique name – so be creative. I
named mine “joyofdata-test-s3″. Within that bucket I created a folder
named “input” and that’s where I uploaded the two files into.
The Mapper (map.R) …
As we are going to realize our MapReduce job via Hadoop’s streaming API
– the provided mapper and reducer(s) have to be executables. To achieve
this we tell the bash in the first line what to make of this script –
running it through R.
The rest is quite straightforward and
usual MapReduce business. The data is fed line by line to the mapper
which dissects the current line into its tab-separated pieces, counts
the 2-grams of the text and then spits out for every combination of
country code and 2-gram a new line with the count as the value. So its
output is going to be two keys
and one value – all separated by tabs. The fact that we use two keys we
are going to have to tell Hadoop about; otherwise the subsequent
sorting is by default only taking the first key part into account and
the sorting will be wrong.
#! /usr/bin/env Rscript # map.R library(stringdist, quietly=TRUE) input <- file("stdin", "r") while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) { # in case of empty lines # more sophisticated defensive code makes sense here if(nchar(line) == 0) break fields <- unlist(strsplit(line, "\t")) # extract 2-grams d <- qgrams(tolower(fields[4]), q=2) for(i in 1:ncol(d)) { # language / 2-gram / count cat(fields[2], "\t", colnames(d)[i], "\t", d[1,i], "\n") } } close(input)
… and the Reducer (reduce.R)
In the mean time Hadoop sorted the
output of all mappers by the keys and now feeds those line by line to
the reducer. Now we basically just collect all the counts for one
compound key, add them up and spit out a single line for every compound
key with the final count. Again all keys and values are separated by
tabs.
#! /usr/bin/env Rscript # reduce.R input <- file("stdin", "r") # initialize variables that keep # track of the state is_first_line <- TRUE while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) { line <- unlist(strsplit(line, "\t")) # current line belongs to previous # line's key pair if(!is_first_line && prev_lang == line[1] && prev_2gram == line[2]) { sum <- sum + as.integer(line[3]) } # current line belongs either to a # new key pair or is first line else { # new key pair - so output the last # key pair's result if(!is_first_line) { # language / 2-gram / count cat(prev_lang,"\t",prev_2gram,"\t",sum,"\n") } # initialize state trackers prev_lang <- line[1] prev_2gram <- line[2] sum <- as.integer(line[3]) is_first_line <- FALSE } } # the final record cat(prev_lang,"\t",prev_2gram, "\t", sum, "\n") close(input)
Both files – map.R and reduce.R – we now upload to S3 into a new folder of our bucket named “scripts”.
Testing the Code at the Shell
Before we bother Amazon with our
brilliant code it makes sense to test it locally – without Hadoop – by
simulating the MapReduce job pipe line:
# make the scripts executable > chmod 755 map.R reduce.R # the pipe > cat input1 input2 | ./map.R | sort | ./reduce.R de at 1 de ch 3 de [d 3 de de 3 ...
Cat feeds the data line by line to map.R
which outputs its lines. These are then sorted using the full lines’
content and then fed to reduce.R in an ordered fashion. Okay, that seems
to be alright.
Bootstrapping the Hadoop Cluster
If we would run this code with EMR now
it would fail. That’s because the set up EC2 instances come with R but
do not feature the stringdist package. The stringdist package needs at
least R 2.15.3 and that is also why I am going to use an AMI
(Amazon Machine Image) of major version 3. The AMIs version 2 are
shipped with R 2.11 and those of version 3 at least provide R 3.0.1. But
of course it would be possible to not just install an R package but a newer version of R itself
– I leave this as an exercise to the reader. So, I submit to the lord
of opportunism and just go with the newest AMI and Bob’s your uncle.
Actually I went through quite some pain
until I figured out how to set up stringdist successfully and found
eventually two possible options:
#!/bin/bash wget http://cran.us.r-project.org/src/contrib/stringdist_0.7.2.tar.gz sudo R CMD INSTALL stringdist_0.7.2.tar.gz
#!/bin/bash # a server close to Dublin where my Amazon computing center resides url="http://ftp.heanet.ie/mirrors/cran.r-project.org/" sudo Rscript -e "install.packages('stringdist',repos='${url}')"
Choose the one you like best, name it bootstrap.R.sh and upload it into the “scripts” folder of the S3 bucket as well.
Hey Ho Let’s Go
Now that we are made our pile we are ready for some action. So let’s all head to the EMR section of our AWS console and “Create (a) cluster”. I will take the defaults and tell you about the adjustments I made:
- Cluster Configuration: choose Cluster Name and a folder for the log files.
- Software Configuration: choose AMI 3.0.3 or higher. Get rid of Pig and Hive.
- Hardware Configuration: One Master (m1.medium). No Core, no Task. We don’t need a Core or Task instance for this simple example so sparing those will save us a few cents.
- Bootstrap Actions: choose “Custom action” and then select the bootstrap script.
- Steps: choose “Streaming program” and configure it. Select the mapper and the reducer scripts and the input folder. Also choose an output folder – which does not yet exist (and use an s3-url, of course)! Now enter the following two lines into the “Arguments” box so Hadoop knows that the first two tab-separated terms coming from the mapper are keys and the final one a value:
-jobconf stream.num.map.output.key.fields=2 -jobconf map.output.key.field.separator=\t
And finally: “Create cluster”!
Inspecting the Results
After waiting for approximately 10
minutes – I guess (hope) it’s the bootstrapping and further maintenance
that takes so long :) – we can finally enjoy the full glory of our
endeavour by opening output/part-00000:
de t 5 de z 1 de [d 6 de at 1 de ch 6 ...
As the AWS pricing model is by the hour
you could add another step and run it on the same cluster. If you do not
intend to do so I recommend terminating it.
The call to hadoop-streaming.jar may be found in /log/[ID]/steps/[N]/controller:
/home/hadoop/contrib/streaming/hadoop-streaming.jar -files s3://joyofdata-test-s3/scripts/map.R, s3://joyofdata-test-s3/scripts/reduce.R -mapper map.R -reducer reduce.R -input s3://joyofdata-test-s3/input/ -output s3://joyofdata-test-s3/output/ -jobconf stream.num.map.output.key.fields=2 -jobconf map.output.key.field.separator=\t
R and STDIN and -OUT
MapReduce streaming is based on reading
from stdin and writing to stdout. For that reason we need full control
over what R writes to the stdout. A lot of R functions recklessly blurt
out their stuff which goes to stdout and is then entering the MapReduce
pipeline. Preventing this is sadly not very straightforward at all
times. In case of library() this can be achieved by setting quietly to TRUE. Some people suggest calling sink(“/dev/null”) and when you want to write to stdout then resetting priorly with sink(). I couldn’t verify that this actually works though.
No comments:
Post a Comment