The plyrmr tutorial

Tutorial

Predefined operations

Let's start with a simple operation such as adding a column to a data frame. The data set mtcars comes with R and contains specification and performance data about a few car models:
mtcars
                     mpg cyl  disp  hp vs gear carb
Mazda RX4           21.0   6 160.0 110  0    4    4
Mazda RX4 Wag       21.0   6 160.0 110  0    4    4
Datsun 710          22.8   4 108.0  93  1    4    1
Hornet 4 Drive      21.4   6 258.0 110  1    3    1
Hornet Sportabout   18.7   8 360.0 175  0    3    2
Valiant             18.1   6 225.0 105  1    3    1
Duster 360          14.3   8 360.0 245  0    3    4
....
One may be interested in how many carburetors per cylinder each model uses, and that's a simple bind.cols call away:
bind.cols(mtcars, carb.per.cyl = carb/cyl)
                     mpg cyl  disp  hp vs gear carb carb.per.cyl
Mazda RX4           21.0   6 160.0 110  0    4    4       0.6667
Mazda RX4 Wag       21.0   6 160.0 110  0    4    4       0.6667
Datsun 710          22.8   4 108.0  93  1    4    1       0.2500
Hornet 4 Drive      21.4   6 258.0 110  1    3    1       0.1667
Hornet Sportabout   18.7   8 360.0 175  0    3    2       0.2500
Valiant             18.1   6 225.0 105  1    3    1       0.1667
Duster 360          14.3   8 360.0 245  0    3    4       0.5000
....
bind.cols is plyrmr's own version of transform and follows a model that is common to many functions in plyr and plyrmr. The function name gives a general idea of what the function is for. The first argument is always the data set to be processed. The following arguments provide the details of what type of processing is going to take place, in the form of one or more optionally named expressions. These expressions can refer to the columns of the data frame as if they were additional variables, according to non standard evaluation rules. Now let's imagine that we have a huge data set with the same structure but instead of being stored in memory, it is stored in a HDFS file named "/tmp/mtcars". It's way too big to be loaded with read.table or equivalent. With plyrmr one just needs to enter:
bind.cols(input("/tmp/mtcars"), carb.per.cyl = carb/cyl)
                     mpg cyl  disp  hp vs gear carb carb.per.cyl
Mazda RX4           21.0   6 160.0 110  0    4    4       0.6667
Mazda RX4 Wag       21.0   6 160.0 110  0    4    4       0.6667
Datsun 710          22.8   4 108.0  93  1    4    1       0.2500
Hornet 4 Drive      21.4   6 258.0 110  1    3    1       0.1667
Hornet Sportabout   18.7   8 360.0 175  0    3    2       0.2500
Valiant             18.1   6 225.0 105  1    3    1       0.1667
Duster 360          14.3   8 360.0 245  0    3    4       0.5000
....
What we see are only a few arbitrary rows from the resulting data set. This is not only a consequence of the limited screen real estate, but also, in the case of large data sets, of the capacity gap between memory of a single machine and big data. In general, we can't expect to be able to load big data in memory. Sometimes, after summarization or filtering, the result of processing big data is small enough to fit into main memory. In this example, we know the data set is small so we can just go ahead and enter:
as.data.frame(bind.cols(input("/tmp/mtcars"), carb.per.cyl = carb/cyl))
                     mpg cyl  disp  hp vs gear carb carb.per.cyl
Mazda RX4           21.0   6 160.0 110  0    4    4       0.6667
Mazda RX4 Wag       21.0   6 160.0 110  0    4    4       0.6667
Datsun 710          22.8   4 108.0  93  1    4    1       0.2500
Hornet 4 Drive      21.4   6 258.0 110  1    3    1       0.1667
Hornet Sportabout   18.7   8 360.0 175  0    3    2       0.2500
Valiant             18.1   6 225.0 105  1    3    1       0.1667
Duster 360          14.3   8 360.0 245  0    3    4       0.5000
....
If we can't make this assumption, we may need to write the results of a computation out to a specific path, that is we need the output call:
output(
    bind.cols(
        input("/tmp/mtcars"), 
        carb.per.cyl = carb/cyl), 
    "/tmp/mtcars.out")
                     mpg cyl  disp  hp vs gear carb carb.per.cyl
Mazda RX4           21.0   6 160.0 110  0    4    4       0.6667
Mazda RX4 Wag       21.0   6 160.0 110  0    4    4       0.6667
Datsun 710          22.8   4 108.0  93  1    4    1       0.2500
Hornet 4 Drive      21.4   6 258.0 110  1    3    1       0.1667
Hornet Sportabout   18.7   8 360.0 175  0    3    2       0.2500
Valiant             18.1   6 225.0 105  1    3    1       0.1667
Duster 360          14.3   8 360.0 245  0    3    4       0.5000
....
This is the real deal: we have performed a computation on the cluster, in parallel, and the data is never loaded into memory at once, but the syntax and semantics remain the familiar ones. The last run processed all of 32 rows, but on a large enough cluster it could run on 32 terabytes — in that case you can not use as.data.frame. Even if output appears to return the data to be printed, that's only a sampling. The main effect of the output call is to write out to the specified file.
bind.cols is one of several functions that plyrmr provides in a Hadoop-powered version:
  • data manipulation:
    • bind.cols: add new columns
    • select: select columns
    • where: select rows
    • transmute: all of the above plus summaries
  • from reshape2:
    • melt and dcast: convert between long and wide data frames
  • summary:
    • count
    • quantile
    • sample
  • extract
    • top.k
    • bottom.k

Why does plyrmr have bind.cols, transmute and where instead of transform, summarize and subset, a.k.a why isn't this part of dplyr already?

The main goal of plyrmr if providing big-data close equivalents of well known and useful data frame manipulations and in fact an early design did not define any new functions for data frames. So why try to reinvent the wheel with bind.cols, transmute and where? The main reason is that transform, mutate & C. work best interactively, at the prompt, but they have some problems when used in other functions or packages. The evaluation of arguments can break, and the reason is very technical and covered in another document. But most recently we've been able to overcome this problem at least for select, so why not go the whole nine yards? First of all, we need multi-row summaries. These are not possible in either plyr or dplyr as of this writing (there is an issue open about this, so things may change). Multi-row summaries are extremely important in statistics (quantiles, sketches etc). Next is support for list columns, which are needed for things like models, see the last section. Third, we believe that accurate naming in APIs is not a detail, and names like transform or mutate are extremely vague and not self-documenting. Finally, we believe in friendly defaults born of real use cases, unlike those in functions that silently drop unnamed arguments, such as, again, transform. Functions in plyrmr try a little harder to be helpful and, in that specific case, make up reasonable names. It's possible that as dplyr matures we will buy into that API more extensively. Already today, you can use a magic.wand (see below) to give Hadoop powers to many functions in dplyr.

Combining Operations

What if none of the basic operations is sufficient to perform a needed data processing step? The first available tool is to combine different operations. Going back to the previous example, let's say we want to select cars with a carburetor per cylinder ratio greater than 1. Do such things even exist? On a data frame, there is a quick way to compute the answer, which is
where(
    bind.cols(
        mtcars, 
        carb.per.cyl = carb/cyl), 
    carb.per.cyl >= 1)
               mpg cyl disp  hp vs gear carb carb.per.cyl
Ferrari Dino  19.7   6  145 175  0    5    6            1
Maserati Bora 15.0   8  301 335  0    5    8            1
Wouldn't it be nice if we could do exactly the same on a Hadoop data set? In fact, we almost can:
where(
    bind.cols(
        input("/tmp/mtcars"),
        carb.per.cyl = carb/cyl),
    carb.per.cyl >= 1)
               mpg cyl disp  hp vs gear carb carb.per.cyl
Ferrari Dino  19.7   6  145 175  0    5    6            1
Maserati Bora 15.0   8  301 335  0    5    8            1
The main differences between the data frame version and the Hadoop data version are the input and the output. All there is in between, pretty much works the same.

The pipe operator

You may have noticed that the last example consists of a fairly complex expression, with function calls nested inside other function calls multiple times. The drawbacks of that are twofold. First, the order in which functions appear in the code, top to bottom, is the opposite of the order in which they are executed. Second, additional arguments to each function can be very far from the name of the function. This problem can be mitigated with proper indentation, but it still is a problem. One workaround is to rewrite complex expressions as chains of assignments:
x = bind.cols(mtcars, carb.per.cyl = carb/cyl) 
where(x, carb.per.cyl >= 1)
               mpg cyl disp  hp vs gear carb carb.per.cyl
Ferrari Dino  19.7   6  145 175  0    5    6            1
Maserati Bora 15.0   8  301 335  0    5    8            1
The purists will find that introducing one variable for each intermediate step quite unsightly. To avoid this plyrmr offers a UNIX-style pipe operator %|%.
mtcars %|%
    bind.cols(carb.per.cyl = carb/cyl) %|%
    where(carb.per.cyl >= 1)
               mpg cyl disp  hp vs gear carb carb.per.cyl
Ferrari Dino  19.7   6  145 175  0    5    6            1
Maserati Bora 15.0   8  301 335  0    5    8            1
This operator provides the value of the leftmost expression as the first unnamed argument of the next function call and evaluates it. When multiple operators are chained, they associate to the left. If the first argument is not the right one, you can specify any function argument with the special variable .. as in 2 %|% rnorm(2,..) Rather than arguing over which style is superior, it's probably best to bask in the flexibility made possible by the R language and pick the one that fits your style or a specific situation. In particular, pipes can not express more complex data flows where two flows merge or one splits. In the following I will alternate between these three notations (nested, assignment chain and pipe operator) based on which seems the clearest. It should be safe to assume that each example can be translated into any of the three.

Custom operations

Another way to extend the functionality of plyrmr built-in data manipulation functions is to take any function that accepts a data frame in input and returns a data frame and use the function gapply to give it Hadoop powers. For instance, you have a function that returns the rightmost column of a data frame. This is not simple to achieve with the functions explored so far, but it is a quick one liner:
last.col = function(x) x[, ncol(x), drop = FALSE]
Wouldn't it be great if we could run this on a Hadoop data set? Well, we almost can:
gapply(input("/tmp/mtcars"), last.col)
                    carb
Mazda RX4              4
Mazda RX4 Wag          4
Datsun 710             1
Hornet 4 Drive         1
Hornet Sportabout      2
Valiant                1
Duster 360             4
....
gapply takes any function that accepts and returns data frames, executes it on a Hadoop data set in parallel on relatively small chunks of the data and passes the results to as.data.frame or output which send them to their final destination. Wouldn't it absolutely perfect if the lastcol function itself knew whether it's working on a Hadoop data set or a data frame and do the right thing? It actually is possible:
magic.wand(last.col)
NULL
last.col(mtcars)
                    carb
Mazda RX4              4
Mazda RX4 Wag          4
Datsun 710             1
Hornet 4 Drive         1
Hornet Sportabout      2
Valiant                1
Duster 360             4
....
last.col(input("/tmp/mtcars"))
                    carb
Mazda RX4              4
Mazda RX4 Wag          4
Datsun 710             1
Hornet 4 Drive         1
Hornet Sportabout      2
Valiant                1
Duster 360             4
....
For people familiar with object oriented programming in R, this function takes an existing data frame function, meaning one with a data frame as its first argument and return value, and creates a generic function by the same name, with a method for data frames equal to the original function and one for big data sets using do as shown above. The internal R dispatch machinery decides which of the methods to call based on the class of the first argument. If dplyr is your style, you can keep using it on Hadoop data calling magic.wand(mutate, TRUE) or magic.wand(filter, TRUE). The optional second and third arguments to magic.wand help the function process its arguments in the way appropriate for that function, more details in help(magic.wand).

Grouping

Until now we performed row by row operations, whereby each row in the results depends on a single row in the input. In this case we don't care if the data is grouped in one way or another. In most other cases, this distinction is important. For instance, if we wanted to compute the total number of carburetors, we could enter:
mtcars %|% transmute(sum(carb))
  sum.carb.
1        90
What happens if we do this on a Hadoop data set?
input("/tmp/mtcars3", format = if3) %|%
    transmute(sum(carb)) 
    sum.carb.
1          10
1.1        29
1.2        19
1.3        19
1.4        13
That's not what we wanted and that's the where the size of the data cannot be ignored or abstracted away. Think of data in Hadoop as always grouped, one way or another. It couldn't be otherwise: it is stored on multiple devices and, even if it weren't, we can only load it into memory in small chunks. In this specific example, the data is small and to highlight this problem I created an input format that reads the data in unreasonably small chunks, but in Hadoop applications this is the norm. So think of the data as always grouped, initially in arbitrary fashion and later in the way we determine using the functions group, group.f, gather and more. These were inspired by the notion of key in mapreduce, the SQL statement and the dplyr functions with similar names. In this case, we computed partial sums for each of the arbitrary groups — here set to a very small size to make the point. Instead we want to group everything together so we can enter:
input("/tmp/mtcars3", format = if3) %|%
    gather %|%
    transmute(sum(carb), .mergeable = TRUE)
  sum.carb.
1        90
You may have noticed the contradiction between the above statement that data is always in chunks with the availability of a gather function. Luckily, there is a way of grouping recursively, in a tree like fashion, that only works with associative and commutative operations such as the sum, which is enabled by the .mergeable argument and makes gather possible. Anyway, it will all be more clear as we cover other grouping functions.
The group function takes an input and a number of arguments that are evaluated in the context of the data, exactly like bind.cols. The result is a Hadoop data set grouped by the columns defined in those arguments. After this step, all rows that are identical on the columns defined in the group call will be loaded into memory at once and processed in the same call. Here is an example. Let's say we want to calculate the average mileage for cars with the same number of cylinders:
input("/tmp/mtcars") %|%
    group(cyl) %|%
    transmute(mean.mpg = mean(mpg))
    cyl mean.mpg
1     6    19.74
1.1   4    26.66
1.2   8    15.10
This is mostly a scalable programs, but there are some caveats: we need to be mindful of the size of the groups. If they are very big they will bust memory limits, so we need to reach for some advanced techniques to avoid this problem. If they are very small, like a handful of rows, we may run into some efficiency issues related to the current R and rmr2 implementations rather than fundamental (so there is hope they will go away one day).
When the definition of the grouping column is more complicated, we may need to reach for the uber-general group.f, the grouping relative of gapply (in fact, these two functions are the foundation for everything else in plyrmr). Let's go back to the last.col example. If we need to group by the last columns of a data frame, this is all we need to do:
input("/tmp/mtcars") %|%
    group.f(last.col) %|%
    transmute(mean.mpg = mean(mpg)) 
    carb mean.mpg
1      4    15.79
1.1    1    25.34
1.2    2    22.40
1.3    3    16.30
1.4    6    19.70
1.5    8    15.00

Better than SQL

Despite the SQL-ish flavor and undeniable SQL inspiration for some of these operations, we want to highlight a few ways in which plyrmr is much more powerful than SQL. The first is that summaries or aggregation don't need to be limited to a single row. One form of aggregation are summaries and summaries can have many elements, even thousands. Momenta, quantiles, histograms, samples, they all have multiple entries. You could represent them as multiple columns up to a certain size, but removing the SQL limitation on aggregations is a good thing. Let's say you want to examine the quantiles of the gas mileage data in each group of cars with the same number of carburetors
input("/tmp/mtcars") %|%
    group(carb) %|%
    quantile 
       carb   mpg cyl   disp    hp  vs gear
0%        4 10.40   6 160.00 110.0 0.0  3.0
25%       4 13.55   6 167.60 123.0 0.0  3.0
50%       4 15.25   8 350.50 210.0 0.0  3.5
75%       4 18.85   8 420.00 241.2 0.0  4.0
100%      4 21.00   8 472.00 264.0 1.0  5.0
0%.1      1 18.10   4  71.10  65.0 1.0  3.0
25%.1     1 21.45   4  78.85  66.0 1.0  3.0
....
And what to say about working in a real programming language, and one with an unmatched library of statistical methods for good measure? You know how many aggregate functions ANSI SQL 92 has? 5, according to my references. What if you wanted to compute a linear model for each group? Forget it, or write some extension against a DBMS-specific API in some vendor-selected language. Not so with plyrmr:
models = 
    input("/tmp/mtcars") %|%
    group(carb) %|%
    transmute(model = list(lm(mpg~cyl+disp))) %|%
    as.data.frame
models
    carb        model
1      4 c(22.693....
1.1    1 c(9.2859....
1.2    2 c(32.723....
1.3    3 c(16.3, ....
1.4    6 c(19.7, ....
1.5    8 c(15, NA....
This may not look very familiar at first, but if we access one cell we will find a tried and true linear model.
models[1,2]
[[1]]

Call:
lm(formula = mpg ~ cyl + disp)

Coefficients:
(Intercept)          cyl         disp  
     22.694        0.329       -0.030  
....
This can be used for prediction, whether in another plyrmr program or locally.

No comments:

Post a Comment