MapReduce with R on Hadoop and Amazon EMR

MapReduce with R on Hadoop and Amazon EMR

You all know why MapReduce is fancy – so let’s just jump right in. I like researching data and I like to see results fast – does that mean I enjoy the process of setting up a Hadoop cluster? No, I doubt there is any correlation – neither causal nor merely statistical. The good news is there are already quite a lot of cloud computing providers offering Hadoop clusters on demand! For this article I got my hands on Amazon’s Elastic MapReduce (EMR) service (which is an extension of its EC2 service) that sets up the Hadoop cluster for you. Okay – almost at least. For this article we are going to count 2-grams in (dummy text) data using the stringdist library.

Setting Up an Amazon Web Service Account

First of all you need an account for Amazon Web Service (AWS). Well, that’s straightforward. I recommend checking out their Getting Started with AWS area. They need the credit card because as soon as you have the account you can do things that might cost them money. But their pricing model is very transparent and they even offer an “AWS Free Usage Tier” that allows you to play around with EC2 and other services for free – f.x. with their smallest instance, the t1.micro. EMR though runs on at least t1.small instances – and we are even going to use a t1.medium – so it will cost a few cents. And “few cents” is meant literally in this case.
In the course of setting up your account it is important that you also create access keys. You aren’t going to use those yourself (for the purpose of the described example) but the EMR job is going to fail prematurely otherwise. To do so you enter the “Security Credentials” section of your AWS console and there – exactly – you create frewh access keys.

The Big Data and Amazon S3

The data is two files (“input1″ and “input2″) and consists of four tab-separated columns. First column is some sort of an ID, second column a country code, third column holds the initials/name of the text’s author and the fourth column keeps the text we are going to have the 2-grams counted for.
content of file "input1":

2354    de    A.B.    [deutscher Text]
2355    en    B.C.    [english quote]
2356    de    C.D.    [deutscher Text]
2357    de    D.E.    [deutscher Text]
2358    en    E.F.    [english text]

content of file "input2":

8574    de    F.G.    [deutscher Text]
8575    en    G.H.    [english text]
8576    de    H.I.    [deutscher Text]
8577    de    I.J.    [deutsches Zitat]
8578    en    J.K.    [english text]
Hadoop can be fed data either from disk or from HDFS. In case of Amazon EMR though we will have to upload those two files to Amazon’s Simple Storage Service (S3). For that purpose you enter the S3 section of your AWS console and create a bucket. The bucket needs a globally (also this word is meant literally) unique name – so be creative. I named mine “joyofdata-test-s3″. Within that bucket I created a folder named “input” and that’s where I uploaded the two files into.

The Mapper (map.R) …

mapreduce
Simplified Illustration of the MapReduce Example
As we are going to realize our MapReduce job via Hadoop’s streaming API – the provided mapper and reducer(s) have to be executables. To achieve this we tell the bash in the first line what to make of this script – running it through R.
The rest is quite straightforward and usual MapReduce business. The data is fed line by line to the mapper which dissects the current line into its tab-separated pieces, counts the 2-grams of the text and then spits out for every combination of country code and 2-gram a new line with the count as the value. So its output is going to be two keys and one value – all separated by tabs. The fact that we use two keys we are going to have to tell Hadoop about; otherwise the subsequent sorting is by default only taking the first key part into account and the sorting will be wrong.
#! /usr/bin/env Rscript
# map.R

library(stringdist, quietly=TRUE)

input <- file("stdin", "r")

while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
   # in case of empty lines
   # more sophisticated defensive code makes sense here
   if(nchar(line) == 0) break
   
   fields <- unlist(strsplit(line, "\t"))
     
   # extract 2-grams
   d <- qgrams(tolower(fields[4]), q=2)
   
   for(i in 1:ncol(d)) {
     # language / 2-gram / count
     cat(fields[2], "\t", colnames(d)[i], "\t", d[1,i], "\n")
   }
}

close(input)

… and the Reducer (reduce.R)

In the mean time Hadoop sorted the output of all mappers by the keys and now feeds those line by line to the reducer. Now we basically just collect all the counts for one compound key, add them up and spit out a single line for every compound key with the final count. Again all keys and values are separated by tabs.
#! /usr/bin/env Rscript
# reduce.R

input <- file("stdin", "r")

# initialize variables that keep
# track of the state

is_first_line <- TRUE

while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
   line <- unlist(strsplit(line, "\t"))
   
   # current line belongs to previous
   # line's key pair
   if(!is_first_line && 
        prev_lang == line[1] && 
        prev_2gram == line[2]) {
       sum <- sum + as.integer(line[3])   
   }
   # current line belongs either to a
   # new key pair or is first line
   else {
       # new key pair - so output the last
       # key pair's result
       if(!is_first_line) {
           # language / 2-gram / count
           cat(prev_lang,"\t",prev_2gram,"\t",sum,"\n")
       } 
   
       # initialize state trackers
       prev_lang <- line[1]
       prev_2gram <- line[2]
       sum <- as.integer(line[3])
       is_first_line <- FALSE
   }
}

# the final record
cat(prev_lang,"\t",prev_2gram, "\t", sum, "\n")

close(input)
Both files – map.R and reduce.R – we now upload to S3 into a new folder of our bucket named “scripts”.

Testing the Code at the Shell

Before we bother Amazon with our brilliant code it makes sense to test it locally – without Hadoop – by simulating the MapReduce job pipe line:
# make the scripts executable
> chmod 755 map.R reduce.R

# the pipe
> cat input1 input2 | ./map.R | sort |  ./reduce.R
de        at       1 
de        ch       3 
de        [d       3 
de        de       3 
...
Cat feeds the data line by line to map.R which outputs its lines. These are then sorted using the full lines’ content and then fed to reduce.R in an ordered fashion. Okay, that seems to be alright.

Bootstrapping the Hadoop Cluster

If we would run this code with EMR now it would fail. That’s because the set up EC2 instances come with R but do not feature the stringdist package. The stringdist package needs at least R 2.15.3 and that is also why I am going to use an AMI (Amazon Machine Image) of major version 3. The AMIs version 2 are shipped with R 2.11 and those of version 3 at least provide R 3.0.1. But of course it would be possible to not just install an R package but a newer version of R itself – I leave this as an exercise to the reader. So, I submit to the lord of opportunism and just go with the newest AMI and Bob’s your uncle.
Actually I went through quite some pain until I figured out how to set up stringdist successfully and found eventually two possible options:
#!/bin/bash

wget http://cran.us.r-project.org/src/contrib/stringdist_0.7.2.tar.gz

sudo R CMD INSTALL stringdist_0.7.2.tar.gz
#!/bin/bash

# a server close to Dublin where my Amazon computing center resides
url="http://ftp.heanet.ie/mirrors/cran.r-project.org/"

sudo Rscript -e "install.packages('stringdist',repos='${url}')"
Choose the one you like best, name it bootstrap.R.sh and upload it into the “scripts” folder of the S3 bucket as well.

Hey Ho Let’s Go

Now that we are made our pile we are ready for some action. So let’s all head to the EMR section of our AWS console and “Create (a) cluster”. I will take the defaults and tell you about the adjustments I made:
  1. Cluster Configuration: choose Cluster Name and a folder for the log files.
  2. Software Configuration: choose AMI 3.0.3 or higher. Get rid of Pig and Hive.
  3. Hardware Configuration: One Master (m1.medium). No Core, no Task. We don’t need a Core or Task instance for this simple example so sparing those will save us a few cents.
  4. Bootstrap Actions: choose “Custom action” and then select the bootstrap script.
  5. Steps: choose “Streaming program” and configure it. Select the mapper and the reducer scripts and the input folder. Also choose an output folder – which does not yet exist (and use an s3-url, of course)! Now enter the following two lines into the “Arguments” box so Hadoop knows that the first two tab-separated terms coming from the mapper are keys and the final one a value:
-jobconf stream.num.map.output.key.fields=2
-jobconf map.output.key.field.separator=\t
And finally: “Create cluster”!

Inspecting the Results

After waiting for approximately 10 minutes – I guess (hope) it’s the bootstrapping and further maintenance that takes so long :) – we can finally enjoy the full glory of our endeavour by opening output/part-00000:
de         t       5 
de         z       1 
de        [d       6 
de        at       1 
de        ch       6 
...
As the AWS pricing model is by the hour you could add another step and run it on the same cluster. If you do not intend to do so I recommend terminating it.
The call to hadoop-streaming.jar may be found in /log/[ID]/steps/[N]/controller:
/home/hadoop/contrib/streaming/hadoop-streaming.jar 
  -files s3://joyofdata-test-s3/scripts/map.R,
         s3://joyofdata-test-s3/scripts/reduce.R 
  -mapper map.R 
  -reducer reduce.R 
  -input s3://joyofdata-test-s3/input/ 
  -output s3://joyofdata-test-s3/output/ 
  -jobconf stream.num.map.output.key.fields=2 
  -jobconf map.output.key.field.separator=\t

R and STDIN and -OUT

MapReduce streaming is based on reading from stdin and writing to stdout. For that reason we need full control over what R writes to the stdout. A lot of R functions recklessly blurt out their stuff which goes to stdout and is then entering the MapReduce pipeline. Preventing this is sadly not very straightforward at all times. In case of library() this can be achieved by setting quietly to TRUE. Some people suggest calling sink(“/dev/null”) and when you want to write to stdout then resetting priorly with sink(). I couldn’t verify that this actually works though.

No comments:

Post a Comment