My first mapreduce job
Conceptually, mapreduce is not very different than a combination oflapply
s and a tapply
:
transform elements of a list, compute an index — key in mapreduce
jargon — and process the groups thus defined. Let's start with a simple lapply
example: small.ints = 1:1000
sapply(small.ints, function(x) x^2)
small.ints = to.dfs(1:1000)
mapreduce(
input = small.ints,
map = function(k, v) cbind(v, v^2))
rmr
.
There are some differences that we will review, but the first thing to
notice is that it isn't all that different, and just two lines of code.
The first line puts the data into HDFS, where the bulk of the data has
to reside for mapreduce to operate on. It is not possible to write out
big data with to.dfs
, not in a scalable way. to.dfs
is nonetheless very useful for a variety of uses like writing test cases, learning and debugging. to.dfs
can put the data in a file of your own choosing, but if you don't
specify one it will create temp files and clean them up when done. The
return value is something we call a big data object. You can assign it to variables, pass it to other rmr
functions, mapreduce jobs or read it back in. It is a stub, that is the
data is not in memory, only some information that helps finding and
managing the data. This way you can refer to very large data sets whose
size exceeds memory limits. Now onto the second line. It has
mapreduce
replace lapply
. We prefer named arguments with mapreduce
because there's quite a few possible arguments, but it's not mandatory. The input is the variable small.ints
which contains the output of to.dfs
,
that is a stub for our small number data set in its HDFS version, but
it could be a file path or a list containing a mix of both. The function
to apply, which is called a map function as opposed to the reduce
function, which we are not using here, is a regular R function with a
few constraints:- It's a function of two arguments, a collection of keys and one of values
- It returns key value pairs using the function
keyval
, which can have vectors, lists, matrices or data.frames as arguments; you can also returnNULL
. You can avoid callingkeyval
explicitly but the return valuex
will be converted with a call tokeyval(NULL,x)
. This is not allowed in the map function when the reduce function is specified and under no circumstance in the combine function, since specifying the key is necessary for the shuffle phase.
from.dfs
. from.dfs
is complementary to to.dfs
and returns a key-value pair collection. from.dfs
is useful in defining map reduce algorithms whenever a mapreduce job
produces something of reasonable size, like a summary, that can fit in
memory and needs to be inspected to decide on the next steps, or to
visualize it. It is much more important than to.dfs
in production work.My second mapreduce job
We've just created a simple job that was logically equivalent to alapply
but can run on big data. That job had only a map. Now to the reduce part. The closest equivalent in R is arguably a tapply
. So here is the example from the R docs: groups = rbinom(32, n = 50, prob = 0.4)
tapply(groups, groups, length)
groups = to.dfs(groups)
from.dfs(
mapreduce(
input = groups,
map = function(., v) keyval(v, 1),
reduce =
function(k, vv)
keyval(k, length(vv))))
to.dfs
. As we said
earlier, this is not the normal way in which big data will enter HDFS;
it is normally the responsibility of scalable data collection systems
such as Flume or Sqoop. In that case we would just specify the HDFS path
to the data as input to mapreduce
. But in this case the input is the variable groups
which contains a big data object, which keeps track of where the data
is and does the clean up when the data is no longer needed. Since a map
function is not specified it is set to the default, which is like an
identity but consistent with the map requirements, that is function(k,v) keyval(k,v)
.
The reduce function takes two arguments, one is a key and the other is a
collection of all the values associated with that key. It could be one
of vector, list, data frame or matrix depending on what was returned by
the map function. The idea is that if the user returned values of one
class, we should preserve that through the shuffle phase. Like in the
map case, the reduce function can return NULL
, a key-value pair as generated by the function keyval
or any other object x
which is equivalent to keyval(NULL, x)
.
The default is no reduce, that is the output of the map is the output
of mapreduce. In this case the keys are realizations of the binomial and
the values are all 1
(please note recycling in action) and the only important thing is how many there are, so length
gets the job done. Looking back at this second example, there are some small differences with tapply
but the overall complexity is very similar.Word count
The word count program has become a sort of "hello world" of the mapreduce world. For a review of how the same task can be accomplished in several languages, but always for mapreduce, see this blog entry.We are defining a function,
wordcount
, that encapsulates
this job. This may not look like a big deal but it is important. Our
main goal was not simply to make it easy to run a mapreduce job but to
make mapreduce jobs first class citizens of the R environment and to
make it easy to create abstractions based on them. For instance, we
wanted to be able to assign the result of a mapreduce job to a variable —
and I mean the result, not some error code or diagnostics —
and to create complex expressions including mapreduce jobs. We take the
first step here by creating a function that is itself a job, can be
chained with other jobs, executed in a loop etc. Let's now look at the signature.
wordcount =
function(
input,
output = NULL,
pattern = " "){
wc.map =
function(., lines) {
keyval(
unlist(
strsplit(
x = lines,
split = pattern)),
1)}
NULL
. The value contains several lines of text, which gets split according to a pattern. Here you can see that pattern
is accessible in the mapper without any particular work on the
programmer side and according to normal R scope rules. This apparent
simplicity hides the fact that the map function is executed in a
different interpreter and on a different machine than the mapreduce
function. Behind the scenes the R environment is serialized, broadcast
to the cluster and restored on each interpreter running on the nodes.
For each word, a key value pair (w, 1) is generated with keyval
and their collection is the return value of the mapper. wc.reduce =
function(word, counts ) {
keyval(word, sum(counts))}
keyval
.
Finally, specifying the use of a combiner is necessary to guarantee the
scalability of this algorithm. Now on to the mapreduce call. mapreduce(
input = input ,
output = output,
input.format = "text",
map = wc.map,
reduce = wc.reduce,
combine = T)}
mapreduce
.
The map and reduce functions could be as well anonymous functions as
they are used only once, but there is one advantage in naming them. rmr
offers alternate backends, in particular one can switch off Hadoop altogether with rmr.options(backend = "local")
.
While this is of no use for production work, as it offers no
scalability, it is an amazing resource for learning and debugging as we
are dealing with a local, run of the mill R program with the same
semantics as when run on top of Hadoop. This, in combination with using
named map and reduce functions, allows us to use debug
to debug mapper and reducer the familiar way. The input can be an HDFS path, the return value of
to.dfs
or another job or a list thereof — potentially, a mix of all three cases, as in list("a/long/path", to.dfs(...), mapreduce(...), ...)
. The output can be an HDFS path but if it is NULL
some temporary file will be generated and wrapped in a big data object, like the ones generated by to.dfs
.
In either event, the job will return the information about the output,
either the path or the big data object. Therefore, we simply pass along
the input and output of thewordcount
function to the mapreduce
call and return whatever its return value. That way the new function also behaves like a proper mapreduce job — more details here. The input.format
argument allows us to specify the format of the input. The default is
based on R's own serialization functions and supports all R data types.
In this case we just want to read a text file, so the "text"
format will create key value pairs with a NULL
key
and a line of text as value. You can easily specify your own input and
output formats and even accept and produce binary formats with the
functions make.input.format
and make.output.format
. Logistic Regression
Now on to an example from supervised learning, specifically logistic regression by gradient descent. Again we are going to create a function that encapsulates this algorithm.logistic.regression =
function(input, iterations, dims, alpha){
alpha
. Now we are going to make a
slight departure from the actual order in which the code is written. The
source code goes on to define the map and reduce functions, but we are
going to delay their presentation slightly. plane = t(rep(0, dims))
g = function(z) 1/(1 + exp(-z))
for (i in 1:iterations) {
gradient =
values(
from.dfs(
mapreduce(
input,
map = lr.map,
reduce = lr.reduce,
combine = T)))
plane = plane + alpha * gradient }
plane }
Then we have the main loop where computing the gradient of the loss function is the duty of a map reduce job, whose output is brought into main memory with a call to
from.dfs
— any intermediate
result files are managed by the system, not you. The only important
thing you need to know is that the gradient is going to fit in memory so
we can call from.dfs
to get it without exceeding available resources. lr.map =
function(., M) {
Y = M[,1]
X = M[,-1]
keyval(
1,
Y * X *
g(-Y * as.numeric(X %*% t(plane))))}
g
and plane
making their necessary appearance here without any work on the
developer's part. The access here is read only but you could even modify
them if you wanted — the semantics is copy on assign, which is
consistent with how R works and easily supported by Hadoop. Since in the
next step we just want to add everything together, we return a dummy,
constant key for each value. Note the use of recycling in keyval
. lr.reduce =
function(k, Z)
keyval(k, t(as.matrix(apply(Z,2,sum))))
mapreduce
also accepts a distinct combiner function. Remember that a combiner's
arguments can come from a map or a combine function and its return value
can go to a combine or reduce function. Finally, a reminder that both
map and reduce functions need to be defined inside the logistic
regression function to have access to the g
function and the plane
vector, so cutting and pasting this code in this order won't work.To make this example production-level there are several things one needs to do, like having a convergence criterion instead of a fixed iterations number an an adaptive learning rate, but probably gradient descent just requires too many iterations to be the right approach in a big data context. But this example should give you all the elements to be able to implement, say, conjugate gradient instead. In general, when each iteration requires I/O of a large data set, the number of iterations needs to be modest and algorithms with O(log(N)) number of iterations are natural candidates, even if the work in each iteration may be more substantial.
K-means
We are now going to cover a simple but significant clustering algorithm and the complexity will go up just a little bit. To cheer yourself up, you can take a look at this alternative implementation which requires three languages, python, pig and java, to get the job done and is hailed as a model of simplicity.We are talking about k-means. This is not a production ready implementation, but should be illustrative of the power of this package. You simply can not do this in pig or hive alone and it would take hundreds of lines of code in java. Please note the code has been slightly rearranged for presentation purposes and won't work if you cut and paste it into the R console. See kmeans.R for working code.
dist.fun =
function(C, P) {
apply(
C,
1,
function(x)
colSums((t(P) - x)^2))}
C
and a matrix of points P
very
efficiently, on my laptop it can do 10^6 points and 10^2 centers in 5
dimensions in approx. 16s. The only explicit iteration is over the
dimension, but all the other operations are vectorized (e.g. loops are
pushed to the C library), hence the speed. kmeans.map =
function(., P) {
nearest = {
if(is.null(C))
sample(
1:num.clusters,
nrow(P),
replace = T)
else {
D = dist.fun(C, P)
nearest = max.col(-D)}}
if(!(combine || in.memory.combine))
keyval(nearest, P)
else
keyval(nearest, cbind(1, P))}
if
: the first
iteration when no candidate centers are available and all the following
ones. Please note that while the points are stored in HDFS and provided
to the map function as its second argument, the centers are simply
stored in a matrix and available in the map function because of normal
scope rules. In the first iteration, each point is randomly assigned to a
center, whereas in the following ones a min distance criterion is used.
Finally notice the vectorized use of keyval whereby all the
center-point pairs are returned in one statement (the correspondence is
positional, with the second dimension used when present). kmeans.reduce = {
if (!(combine || in.memory.combine) )
function(., P)
t(as.matrix(apply(P, 2, mean)))
else
function(k, P)
keyval(
k,
t(as.matrix(apply(P, 2, sum))))}
kmeans.mr =
function(
P,
num.clusters,
num.iter,
combine,
in.memory.combine) {
C = NULL
for(i in 1:num.iter ) {
C =
values(
from.dfs(
mapreduce(
P,
map = kmeans.map,
reduce = kmeans.reduce)))
if(combine || in.memory.combine)
C = C[, -1]/C[, 1]
if(nrow(C) < num.clusters) {
C =
rbind(
C,
matrix(
rnorm(
(num.clusters -
nrow(C)) * nrow(C)),
ncol = nrow(C)) %*% C) }}
C}
P =
do.call(
rbind,
rep(
list(
matrix(
rnorm(10, sd = 10),
ncol=2)),
20)) +
matrix(rnorm(200), ncol =2)
kmeans.mr(
to.dfs(P),
num.clusters = 12,
num.iter = 5,
combine = FALSE,
in.memory.combine = FALSE)
Linear Least Squares
This is an example of a hybrid mapreduce-conventional solution to a well known problem. We will start with a mapreduce job that results in a smaller data set that can be brought into main memory and processed in a single R instance. This is straightforward in rmr because of the simple primitive that transfers data into memory,from.dfs
, and
the R-native data model. This is in contrast with hybrid pig-java-python
solutions where mapping data types from one language to the other is a
time-consuming and error-prone chore the developer has to deal with.Specifically, we want to solve LLS under the assumption that we have too many data points to fit in memory but not such a huge number of variables that we need to implement the whole process as map reduce job. This is the basic equation we want to solve in the least square sense:
X β = y
We are going to do it by using the function solve as in the following expression, that is solving the normal equations.
solve(t(X)%*%X, t(X)%*%y)But let's assume that X is too big to fit in memory, so we have to compute the transpose and matrix products using map reduce, then we can do the solve as usual on the results. This is our general plan.
Let's get some data first, potentially big data matrix
X
and a regular vector y
:X = matrix(rnorm(2000), ncol = 10)
X.index = to.dfs(cbind(1:nrow(X), X))
y = as.matrix(rnorm(200))
Sum =
function(., YY)
keyval(1, list(Reduce('+', YY)))
XtX =
values(
from.dfs(
mapreduce(
input = X.index,
map =
function(., Xi) {
yi = y[Xi[,1],]
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% Xi))},
reduce = Sum,
combine = TRUE)))[[1]]
Xty =
values(
from.dfs(
mapreduce(
input = X.index,
map = function(., Xi) {
yi = y[Xi[,1],]
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% yi))},
reduce = Sum,
combine = TRUE)))[[1]]
solve
.solve(XtX, Xty)
No comments:
Post a Comment