http://grokbase.com/p/gg/rhadoop/132k8h0901/547-toy-example-from-airplane-data-tutorial-throwing-errors
[RHadoop:#396] Toy example from airplane data tutorial throwing errors.
I think this is it:
Loading required package: rmr2
Loading required package: Rcpp
Loading required package: RJSONIO
Loading required package: methods
Loading required package: digest
Loading required package: functional
Loading required package: stringr
Loading required package: plyr
Error in aggregate.data.frame(vv, by = vv[, c("carb", "cyl")], mean) :
no rows to aggregate
Calls: <Anonymous> ... withVisible -> eval -> eval -> reduce -> aggregate.data.frame
Execution halted
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:576)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:530)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
log4j:WARN No appenders could be found for logger (org.apache.hadoop.hdfs.DFSClient).
log4j:WARN Please initialize the log4j system properly.
- Hide quoted text -
post: rhadoop@googlegroups.com ||
unsubscribe: rhadoop+unsubscribe@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
Loading required package: rmr2
Loading required package: Rcpp
Loading required package: RJSONIO
Loading required package: methods
Loading required package: digest
Loading required package: functional
Loading required package: stringr
Loading required package: plyr
Error in aggregate.data.frame(vv, by = vv[, c("carb", "cyl")], mean) :
no rows to aggregate
Calls: <Anonymous> ... withVisible -> eval -> eval -> reduce -> aggregate.data.frame
Execution halted
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:576)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:530)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
log4j:WARN No appenders could be found for logger (org.apache.hadoop.hdfs.DFSClient).
log4j:WARN Please initialize the log4j system properly.
- Hide quoted text -
On Tuesday, February 19, 2013 3:43:58 PM UTC-5, Antonio Piccolboni wrote:
Can I see the stderr of the R process? jobtracker web UI, click on job ->
number of failed jobs -> logs
Thanks
Antonio
On Tue, Feb 19, 2013 at 12:36 PM, Jim Solderitsch <jsolde...@gmail.com<javascript:>
--Can I see the stderr of the R process? jobtracker web UI, click on job ->
number of failed jobs -> logs
Thanks
Antonio
On Tue, Feb 19, 2013 at 12:36 PM, Jim Solderitsch <jsolde...@gmail.com<javascript:>
wrote:
I tried this within RStudio running in a VM that also contains
Hortonworks Hadoop. I get this result:
13/01/31 10:41:48 INFO streaming.StreamJob: getLocalDirs(): [/hadoop/mapred]
13/01/31 10:41:48 INFO streaming.StreamJob: Running job: job_201301300407_0082
13/01/31 10:41:48 INFO streaming.StreamJob: To kill this job, run:
13/01/31 10:41:48 INFO streaming.StreamJob: /usr/lib/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=sandbox:50300 -kill job_201301300407_0082
13/01/31 10:41:48 INFO streaming.StreamJob: Tracking URL: http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_0082
13/01/31 <http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_008213/01/31> 10:41:49 INFO streaming.StreamJob: map 0% reduce 0%
13/01/31 10:42:01 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:09 INFO streaming.StreamJob: map 100% reduce 67%
13/01/31 10:42:14 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:22 INFO streaming.StreamJob: map 100% reduce 33%
13/01/31 10:42:29 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:37 INFO streaming.StreamJob: map 100% reduce 33%
13/01/31 10:42:42 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:50 INFO streaming.StreamJob: map 100% reduce 33%
13/01/31 10:42:56 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:58 INFO streaming.StreamJob: map 100% reduce 100%
13/01/31 10:42:58 INFO streaming.StreamJob: To kill this job, run:
13/01/31 10:42:58 INFO streaming.StreamJob: /usr/lib/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=sandbox:50300 -kill job_201301300407_0082
13/01/31 10:42:58 INFO streaming.StreamJob: Tracking URL: http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_0082
13/01/31 <http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_008213/01/31> 10:42:58 ERROR streaming.StreamJob: Job not successful. Error: # of failed Reduce Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201301300407_0082_r_000000
13/01/31 10:42:58 INFO streaming.StreamJob: killJob...
Streaming Command Failed!Error in mr(map = map, reduce = reduce, combine = combine, in.folder = if (is.list(input)) { :
hadoop streaming failed with error code 1
BTW, this is similar to some of the failed execution steps I saw when I was trying my earlier experiments.
Any ideas?
Jim
unsubscribe: rhadoop+u...@googlegroups.com <javascript:> ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups
"RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to rhadoop+u...@googlegroups.com <javascript:>.
For more options, visit https://groups.google.com/groups/opt_out.
I tried this within RStudio running in a VM that also contains
Hortonworks Hadoop. I get this result:
from.dfs(+
mapreduce(+ to.dfs(mtcars), + map = + function(k, v)
keyval(v[, c('carb', 'cyl')], v), + reduce = + function(k, vv)
aggregate.data.frame(vv, by = vv[, c("carb", "cyl")], mean)))13/01/31
10:41:45 INFO util.NativeCodeLoader: Loaded the native-hadoop
library13/01/31 10:41:45 INFO zlib.ZlibFactory: Successfully loaded
& initialized native-zlib library13/01/31 10:41:45 INFO
compress.CodecPool: Got brand-new compressorpackageJobJar:
[/tmp/RtmpRXAMpx/rmr-local-env6093222f1f76,
/tmp/RtmpRXAMpx/rmr-global-env60935e1a3370,
/tmp/RtmpRXAMpx/rmr-streaming-map609379a3967b,
/tmp/RtmpRXAMpx/rmr-streaming-reduce60935c8f64be,
/tmp/hadoop-sandbox/hadoop-unjar5497485596887405011/] []
/tmp/streamjob3341665760954785005.jar tmpDir=null
13/01/31 10:41:48 INFO mapred.FileInputFormat: Total input paths to process : 113/01/31 10:41:48 INFO streaming.StreamJob: getLocalDirs(): [/hadoop/mapred]
13/01/31 10:41:48 INFO streaming.StreamJob: Running job: job_201301300407_0082
13/01/31 10:41:48 INFO streaming.StreamJob: To kill this job, run:
13/01/31 10:41:48 INFO streaming.StreamJob: /usr/lib/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=sandbox:50300 -kill job_201301300407_0082
13/01/31 10:41:48 INFO streaming.StreamJob: Tracking URL: http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_0082
13/01/31 <http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_008213/01/31> 10:41:49 INFO streaming.StreamJob: map 0% reduce 0%
13/01/31 10:42:01 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:09 INFO streaming.StreamJob: map 100% reduce 67%
13/01/31 10:42:14 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:22 INFO streaming.StreamJob: map 100% reduce 33%
13/01/31 10:42:29 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:37 INFO streaming.StreamJob: map 100% reduce 33%
13/01/31 10:42:42 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:50 INFO streaming.StreamJob: map 100% reduce 33%
13/01/31 10:42:56 INFO streaming.StreamJob: map 100% reduce 0%
13/01/31 10:42:58 INFO streaming.StreamJob: map 100% reduce 100%
13/01/31 10:42:58 INFO streaming.StreamJob: To kill this job, run:
13/01/31 10:42:58 INFO streaming.StreamJob: /usr/lib/hadoop/libexec/../bin/hadoop job -Dmapred.job.tracker=sandbox:50300 -kill job_201301300407_0082
13/01/31 10:42:58 INFO streaming.StreamJob: Tracking URL: http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_0082
13/01/31 <http://sandbox:50030/jobdetails.jsp?jobid=job_201301300407_008213/01/31> 10:42:58 ERROR streaming.StreamJob: Job not successful. Error: # of failed Reduce Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201301300407_0082_r_000000
13/01/31 10:42:58 INFO streaming.StreamJob: killJob...
Streaming Command Failed!Error in mr(map = map, reduce = reduce, combine = combine, in.folder = if (is.list(input)) { :
hadoop streaming failed with error code 1
BTW, this is similar to some of the failed execution steps I saw when I was trying my earlier experiments.
Any ideas?
Jim
On Thursday, February 14, 2013 6:46:39 PM UTC-5, Antonio Piccolboni wrote:
But why, why does it have to be so complicated? I don't have the
airline data set handy, but let's assume that you figured out the input
format and it is read in as a data frame with some colnames. So let's try
with mtcars which is also a dataframe
from.dfs(
mapreduce(
to.dfs(mtcars),
map =
function(k, v) keyval(v[, c('carb', 'cyl')], v),
reduce =
function(k, vv) aggregate.data.frame(vv, by = vv[, c("carb",
"cyl")], mean)))
Now replace the input, add the input format, replace the col names with
whatever you want to aggregate on and off you go. What am I missing?
Antonio
post: rha...@googlegroups.com <javascript:> ||But why, why does it have to be so complicated? I don't have the
airline data set handy, but let's assume that you figured out the input
format and it is read in as a data frame with some colnames. So let's try
with mtcars which is also a dataframe
from.dfs(
mapreduce(
to.dfs(mtcars),
map =
function(k, v) keyval(v[, c('carb', 'cyl')], v),
reduce =
function(k, vv) aggregate.data.frame(vv, by = vv[, c("carb",
"cyl")], mean)))
Now replace the input, add the input format, replace the col names with
whatever you want to aggregate on and off you go. What am I missing?
Antonio
On Thursday, February 14, 2013 3:03:09 PM UTC-8, Jim Solderitsch wrote:
I can's say where you went wrong but I have a different approach that
seems to work for me. I can't get vectorized keys and values to work.
Reduce seems to get itself into an infinite loop when I do. But when a
concatenate values into strings with paste and then strsplit them in
reduce, I get good results. And the conversion to a data frame works well
to. BUT my version uses the csv input format and currently rmr2 disallows
headers in the source file in this case.
Any way, here is what I came up with. Here I just want to average
Arrival Delays and Departure delays.
#!/usr/bin/env Rscript
#
# Example 2: airline
#
# Calculate average enroute times by year and market (= airport pair)
from the
# airline data set (http://stat-computing.org/**
dataexpo/2009/the-data.html<http://stat-computing.org/dataexpo/2009/the-data.html>
).
# Requires rmr package (https://github.com/**
RevolutionAnalytics/RHadoop/**wiki<https://github.com/RevolutionAnalytics/RHadoop/wiki>
).
#
# by Jeffrey Breen <jef...@jeffreybreen.com>
#
library(rmr2)
library(plyr)
# assumes 'airline' and airline/data exists on HDFS under user's home
directory
# (e.g., /user/cloudera/airline/ & /user/cloudera/airline/data/)
hdfs.data.root = 'airline'
hdfs.data = file.path(hdfs.data.root, 'data')
# unless otherwise specified, directories on HDFS should be relative to
user's home
hdfs.out.root = hdfs.data.root
hdfs.out = file.path(hdfs.out.root, 'out')
# the mapper gets a key and a value vector generated by the formatter
# in our case, the key is NULL and all the field values come in as a
vector
#
mapper.year.market.enroute_**time = function(k, fields) {
# if (as.character(fields$Origin) < as.character(fields$Dest))
# market = paste(as.character(fields$**Origin),
as.character(fields$Dest), sep='-')
#else
# market = paste(as.character(fields$**Dest),
as.character(fields$Origin), sep='-')
# key consists of year, market
# output.key = paste(market, as.character(fields$**UniqueCarrier),
sep='-')
# output.key = paste(as.character(fields$**Origin),
as.character(fields$**UniqueCarrier), sep='-')
# keyval(output.key, fields$DepDelay)
keyval(as.character(fields$**UniqueCarrier),
paste(fields$ArrDelay, fields$DepDelay, sep=","))
# list(c(fields$ArrDelay,fields$**DepDelay)))
# }
}
#
# the reducer gets all the values for a given key
# the values (which may be mult-valued as here) come in the form of a
list()
#
reducer.year.market.enroute_**time = function(key, val.list) {
delays.list = strsplit(unlist(val.list),**split=",")
val.df = ldply(delays.list,as.numeric)
# val.df = ldply(val.list,as.numeric)
colnames(val.df) = c('arrDelay','depDelay')
output.val = paste(nrow(val.df), mean(val.df$arrDelay,na.rm=**TRUE),
mean(val.df$depDelay, na.rm=TRUE),sep=",")
keyval(key, output.val)
}
mr.year.market.enroute_time = function (input, output) {
mapreduce(input = input,
output = output,
input.format = make.input.format("csv", sep = ",",
col.names=c('Year','Month','**DayofMonth','DayOfWeek','**
DepTime','CRSDepTime',
'ArrTime','CRSArrTime','**UniqueCarrier','FlightNum','**TailNum',
'ActualElapsedTime','**CRSElapsedTime','AirTime','**ArrDelay',
'DepDelay','Origin','Dest','**Distance','TaxiIn','TaxiOut',
'Cancelled','*
*CancellationCode','Diverted','**CarrierDelay',
'WeatherDelay','NASDelay','**SecurityDelay','**LateAircraftDelay')),
output.format = make.output.format("csv",**
quote=FALSE,sep=",",mode="**text"),
map = mapper.year.market.enroute_**time,
reduce = reducer.year.market.enroute_**time)
}
csvFormat = make.input.format("csv", sep = ",",mode="text")
out = from.dfs(mr.year.market.**enroute_time(hdfs.data,
hdfs.out),format=csvFormat)
results.df = as.data.frame(out$val,**stringsAsFactors=F )
colnames(results.df) = c('carrier', 'num flights', 'mean arr delay',
'mean dep delay')
print(results.df)
I can's say where you went wrong but I have a different approach that
seems to work for me. I can't get vectorized keys and values to work.
Reduce seems to get itself into an infinite loop when I do. But when a
concatenate values into strings with paste and then strsplit them in
reduce, I get good results. And the conversion to a data frame works well
to. BUT my version uses the csv input format and currently rmr2 disallows
headers in the source file in this case.
Any way, here is what I came up with. Here I just want to average
Arrival Delays and Departure delays.
#!/usr/bin/env Rscript
#
# Example 2: airline
#
# Calculate average enroute times by year and market (= airport pair)
from the
# airline data set (http://stat-computing.org/**
dataexpo/2009/the-data.html<http://stat-computing.org/dataexpo/2009/the-data.html>
).
# Requires rmr package (https://github.com/**
RevolutionAnalytics/RHadoop/**wiki<https://github.com/RevolutionAnalytics/RHadoop/wiki>
).
#
# by Jeffrey Breen <jef...@jeffreybreen.com>
#
library(rmr2)
library(plyr)
# assumes 'airline' and airline/data exists on HDFS under user's home
directory
# (e.g., /user/cloudera/airline/ & /user/cloudera/airline/data/)
hdfs.data.root = 'airline'
hdfs.data = file.path(hdfs.data.root, 'data')
# unless otherwise specified, directories on HDFS should be relative to
user's home
hdfs.out.root = hdfs.data.root
hdfs.out = file.path(hdfs.out.root, 'out')
# the mapper gets a key and a value vector generated by the formatter
# in our case, the key is NULL and all the field values come in as a
vector
#
mapper.year.market.enroute_**time = function(k, fields) {
# if (as.character(fields$Origin) < as.character(fields$Dest))
# market = paste(as.character(fields$**Origin),
as.character(fields$Dest), sep='-')
#else
# market = paste(as.character(fields$**Dest),
as.character(fields$Origin), sep='-')
# key consists of year, market
# output.key = paste(market, as.character(fields$**UniqueCarrier),
sep='-')
# output.key = paste(as.character(fields$**Origin),
as.character(fields$**UniqueCarrier), sep='-')
# keyval(output.key, fields$DepDelay)
keyval(as.character(fields$**UniqueCarrier),
paste(fields$ArrDelay, fields$DepDelay, sep=","))
# list(c(fields$ArrDelay,fields$**DepDelay)))
# }
}
#
# the reducer gets all the values for a given key
# the values (which may be mult-valued as here) come in the form of a
list()
#
reducer.year.market.enroute_**time = function(key, val.list) {
delays.list = strsplit(unlist(val.list),**split=",")
val.df = ldply(delays.list,as.numeric)
# val.df = ldply(val.list,as.numeric)
colnames(val.df) = c('arrDelay','depDelay')
output.val = paste(nrow(val.df), mean(val.df$arrDelay,na.rm=**TRUE),
mean(val.df$depDelay, na.rm=TRUE),sep=",")
keyval(key, output.val)
}
mr.year.market.enroute_time = function (input, output) {
mapreduce(input = input,
output = output,
input.format = make.input.format("csv", sep = ",",
col.names=c('Year','Month','**DayofMonth','DayOfWeek','**
DepTime','CRSDepTime',
'ArrTime','CRSArrTime','**UniqueCarrier','FlightNum','**TailNum',
'ActualElapsedTime','**CRSElapsedTime','AirTime','**ArrDelay',
'DepDelay','Origin','Dest','**Distance','TaxiIn','TaxiOut',
'Cancelled','*
*CancellationCode','Diverted','**CarrierDelay',
'WeatherDelay','NASDelay','**SecurityDelay','**LateAircraftDelay')),
output.format = make.output.format("csv",**
quote=FALSE,sep=",",mode="**text"),
map = mapper.year.market.enroute_**time,
reduce = reducer.year.market.enroute_**time)
}
csvFormat = make.input.format("csv", sep = ",",mode="text")
out = from.dfs(mr.year.market.**enroute_time(hdfs.data,
hdfs.out),format=csvFormat)
results.df = as.data.frame(out$val,**stringsAsFactors=F )
colnames(results.df) = c('carrier', 'num flights', 'mean arr delay',
'mean dep delay')
print(results.df)
On Sunday, February 3, 2013 11:58:36 AM UTC-5, Piers Harding wrote:
Hi -
I've been just working through a problem (most likely my
understanding) of the same code. The attached is a working example
upgraded to rmr2, but where it is less than satisfactory is the solution
that I've had to come up with for the output.key of the map, and the final
reformatting of the data out from.dfs().
Would really appreciate it if you could point out where i've gone
wrong.
Thanks,
Piers Harding.
# adapted from Jeffrey Breens - http://jeffreybreen.wordpress.**
com/2012/03/10/big-data-step-**by-step-slides/<http://jeffreybreen.wordpress.com/2012/03/10/big-data-step-by-step-slides/>
# sample data taken from http://stat-computing.org/**
dataexpo/2009/the-data.html<http://stat-computing.org/dataexpo/2009/the-data.html>
# sort out loss of env with RStudio
Sys.setenv(HADOOP_CMD="/usr/**bin/hadoop")
Sys.setenv(HADOOP_HOME="/usr/**share/hadoop")
# switch on to run local - no Hadoop
DEBUG=FALSE
library(rmr2)
library(rhdfs)
library(reshape2)
sample_data = '/home/piers/Downloads/**dataexpo/tmp/small.csv'
# DEBUG = run locally - ie. you don't need Hadoop, which is pretty cool
if (DEBUG) {
rmr.options(backend = "local")
hdfs.input.path = sample_data
hdfs.output.root = '/tmp/enroute-time'
if (file.exists(hdfs.output.root)**) {
unlink(hdfs.output.root, recursive = TRUE)
}
dir.create(hdfs.output.root)
} else {
# setup input and output directories and files under HDFS and PUT
up the data
rmr.options(backend = "hadoop")
hdfs.input.dir = 'asa-airline'
hdfs.input.path = file.path(hdfs.input.dir, 'data')
hdfs.output.root = 'asa-airline/out'
hdfs.init()
if (hdfs.exists(hdfs.input.dir)) {
hdfs.delete(hdfs.input.dir)
}
hdfs.mkdir(hdfs.input.dir)
hdfs.put(sample_data, hdfs.input.path)
if (hdfs.exists(hdfs.output.root)**) {
hdfs.delete(hdfs.output.root)
}
hdfs.mkdir(hdfs.output.root)
}
# file input formater
asa.csv.text.input.format = make.input.format(mode = 'text', format =
function(con, klen) {
# read a line and parse it into fields
if (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
values = unlist( strsplit(line, "\\,") )
names(values) = c('Year','Month','DayofMonth',**'DayOfWeek',
'DepTime','**CRSDepTime',
'ArrTime','CRSArrTime','**UniqueCarrier',
'FlightNum','**TailNum',
'ActualElapsedTime','**CRSElapsedTime',
'AirTime','**ArrDelay',
'DepDelay','Origin','Dest','**Distance',
'TaxiIn','TaxiOut',
'Cancelled','CancellationCode'**,'Diverted',
'CarrierDelay',
'WeatherDelay','NASDelay','**SecurityDelay','*
*LateAircraftDelay')
# stop those NAs propagating
values[values == 'NA'] <- 0
return( keyval(NULL, values) )
}
else {
return(NULL)
}
} )
# mapper - grab the time fields I want, and build a key of Year +
market
mapper.year.market.enroute_**time = function(key, val) {
# Skip header lines, cancellations, and diversions:
if ( !identical(as.character(val['**Year']), 'Year')) {
if (identical(as.numeric(val['**Cancelled']), 0)
& identical(as.numeric(val['**Diverted']), 0) ) {
# We don't care about direction of travel, so construct 'market'
# with airports ordered alphabetically
# (e.g, LAX to JFK becomes 'JFK-LAX'
if (val['Origin'] < val['Dest'])
market = paste(val['Origin'], val['Dest'], sep='-')
else
market = paste(val['Dest'], val['Origin'], sep='-')
# key consists of year, market - I would love to know a better
way of making compound keys - XXX
output.key = paste(val['Year'], market, sep=':')
# output gate-to-gate elapsed times (CRS and actual) + time in
air
output.val = list(c(val['CRSElapsedTime'], val[
'ActualElapsedTime'], val['AirTime']))
return( keyval(output.key, output.val) )
}
}
return(NULL)
}
# reducer - fix numeric types and calculate the mean times
reducer.year.market.enroute_**time = function(key, val.list) {
# val.list is a list of row vectors
# a data.frame is a list of column vectors
require(plyr)
val.df = ldply(val.list, as.numeric)
colnames(val.df) = c('actual','crs','air')
output.key = key
output.val = c( nrow(val.df), mean(val.df$actual, na.rm=T),
mean(val.df$crs, na.rm=T),
mean(val.df$air, na.rm=T) )
return( keyval(output.key, output.val) )
}
# mapreduce job
mr.year.market.enroute_time = function (input, output) {
mapreduce(input = input,
output = output,
input.format = asa.csv.text.input.format,
map = mapper.year.market.enroute_**time,
reduce = reducer.year.market.enroute_**time,
verbose=T)
}
hdfs.output.path = file.path(hdfs.output.root, 'enroute-time')
results.df = from.dfs(mr.year.market.enrout**e_time(hdfs.input.path,hdfs
.output.path))
# now - all of this fixing up is because of the key problem and loss
of structure
# in the data (ie: I want a data.frame back with vectors for keys, and
output values) - what do I do ?
results.df <- data.frame(colsplit(unique(res**ults.df$key), ":", list(
'year', 'market')),
as.data.frame(matrix(results.**df$val, nrow=
length(results.df$val)/4, ncol=4, byrow=TRUE)))
colnames(results.df) = c('year', 'market', 'flights', 'scheduled',
'actual', 'in.air')
str(results.df)
Hi -
I've been just working through a problem (most likely my
understanding) of the same code. The attached is a working example
upgraded to rmr2, but where it is less than satisfactory is the solution
that I've had to come up with for the output.key of the map, and the final
reformatting of the data out from.dfs().
Would really appreciate it if you could point out where i've gone
wrong.
Thanks,
Piers Harding.
# adapted from Jeffrey Breens - http://jeffreybreen.wordpress.**
com/2012/03/10/big-data-step-**by-step-slides/<http://jeffreybreen.wordpress.com/2012/03/10/big-data-step-by-step-slides/>
# sample data taken from http://stat-computing.org/**
dataexpo/2009/the-data.html<http://stat-computing.org/dataexpo/2009/the-data.html>
# sort out loss of env with RStudio
Sys.setenv(HADOOP_CMD="/usr/**bin/hadoop")
Sys.setenv(HADOOP_HOME="/usr/**share/hadoop")
# switch on to run local - no Hadoop
DEBUG=FALSE
library(rmr2)
library(rhdfs)
library(reshape2)
sample_data = '/home/piers/Downloads/**dataexpo/tmp/small.csv'
# DEBUG = run locally - ie. you don't need Hadoop, which is pretty cool
if (DEBUG) {
rmr.options(backend = "local")
hdfs.input.path = sample_data
hdfs.output.root = '/tmp/enroute-time'
if (file.exists(hdfs.output.root)**) {
unlink(hdfs.output.root, recursive = TRUE)
}
dir.create(hdfs.output.root)
} else {
# setup input and output directories and files under HDFS and PUT
up the data
rmr.options(backend = "hadoop")
hdfs.input.dir = 'asa-airline'
hdfs.input.path = file.path(hdfs.input.dir, 'data')
hdfs.output.root = 'asa-airline/out'
hdfs.init()
if (hdfs.exists(hdfs.input.dir)) {
hdfs.delete(hdfs.input.dir)
}
hdfs.mkdir(hdfs.input.dir)
hdfs.put(sample_data, hdfs.input.path)
if (hdfs.exists(hdfs.output.root)**) {
hdfs.delete(hdfs.output.root)
}
hdfs.mkdir(hdfs.output.root)
}
# file input formater
asa.csv.text.input.format = make.input.format(mode = 'text', format =
function(con, klen) {
# read a line and parse it into fields
if (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
values = unlist( strsplit(line, "\\,") )
names(values) = c('Year','Month','DayofMonth',**'DayOfWeek',
'DepTime','**CRSDepTime',
'ArrTime','CRSArrTime','**UniqueCarrier',
'FlightNum','**TailNum',
'ActualElapsedTime','**CRSElapsedTime',
'AirTime','**ArrDelay',
'DepDelay','Origin','Dest','**Distance',
'TaxiIn','TaxiOut',
'Cancelled','CancellationCode'**,'Diverted',
'CarrierDelay',
'WeatherDelay','NASDelay','**SecurityDelay','*
*LateAircraftDelay')
# stop those NAs propagating
values[values == 'NA'] <- 0
return( keyval(NULL, values) )
}
else {
return(NULL)
}
} )
# mapper - grab the time fields I want, and build a key of Year +
market
mapper.year.market.enroute_**time = function(key, val) {
# Skip header lines, cancellations, and diversions:
if ( !identical(as.character(val['**Year']), 'Year')) {
if (identical(as.numeric(val['**Cancelled']), 0)
& identical(as.numeric(val['**Diverted']), 0) ) {
# We don't care about direction of travel, so construct 'market'
# with airports ordered alphabetically
# (e.g, LAX to JFK becomes 'JFK-LAX'
if (val['Origin'] < val['Dest'])
market = paste(val['Origin'], val['Dest'], sep='-')
else
market = paste(val['Dest'], val['Origin'], sep='-')
# key consists of year, market - I would love to know a better
way of making compound keys - XXX
output.key = paste(val['Year'], market, sep=':')
# output gate-to-gate elapsed times (CRS and actual) + time in
air
output.val = list(c(val['CRSElapsedTime'], val[
'ActualElapsedTime'], val['AirTime']))
return( keyval(output.key, output.val) )
}
}
return(NULL)
}
# reducer - fix numeric types and calculate the mean times
reducer.year.market.enroute_**time = function(key, val.list) {
# val.list is a list of row vectors
# a data.frame is a list of column vectors
require(plyr)
val.df = ldply(val.list, as.numeric)
colnames(val.df) = c('actual','crs','air')
output.key = key
output.val = c( nrow(val.df), mean(val.df$actual, na.rm=T),
mean(val.df$crs, na.rm=T),
mean(val.df$air, na.rm=T) )
return( keyval(output.key, output.val) )
}
# mapreduce job
mr.year.market.enroute_time = function (input, output) {
mapreduce(input = input,
output = output,
input.format = asa.csv.text.input.format,
map = mapper.year.market.enroute_**time,
reduce = reducer.year.market.enroute_**time,
verbose=T)
}
hdfs.output.path = file.path(hdfs.output.root, 'enroute-time')
results.df = from.dfs(mr.year.market.enrout**e_time(hdfs.input.path,hdfs
.output.path))
# now - all of this fixing up is because of the key problem and loss
of structure
# in the data (ie: I want a data.frame back with vectors for keys, and
output values) - what do I do ?
results.df <- data.frame(colsplit(unique(res**ults.df$key), ":", list(
'year', 'market')),
as.data.frame(matrix(results.**df$val, nrow=
length(results.df$val)/4, ncol=4, byrow=TRUE)))
colnames(results.df) = c('year', 'market', 'flights', 'scheduled',
'actual', 'in.air')
str(results.df)
On Friday, 23 November 2012 06:13:20 UTC+13, Antonio Piccolboni wrote:
It looks to me you made one crucial change in the first line, so now
you have to upgrade the code as well. rmr2 is not backward compatible, I
think that fact has not been a secret. We even changed the package name to
make it impossible to run old programs after an upgrade without thinking
about it.
Antonio
It looks to me you made one crucial change in the first line, so now
you have to upgrade the code as well. rmr2 is not backward compatible, I
think that fact has not been a secret. We even changed the package name to
make it impossible to run old programs after an upgrade without thinking
about it.
Antonio
On Thu, Nov 22, 2012 at 2:44 PM, Robert Ness wrote:
I am trying to implement the example code for the Airline dataset
given in Jeffrey Breen's tutorial found here<http://www.slideshare.net/jeffreybreen/big-data-stepbystep-using-r-hadoop-with-rhadoops-rmr-package#btnNext>.
I haven't been successful.
In my input directory I have a csv file with lines that look like
this
*2004,3,25,4,1445,1437,1820,1812,AA,399,N275AA,215,215,
197,8,8,BOS,MIA,1258,6,12,0,,0,0,0,0,0,0*
I am using only one input file, it is a csv containing only a subset
of the true dataset -- only about 5,000 lines.
I went though the code within the formatting, map, and reduce
functions with example inputs and it all works fine.
*When I run the code I get the error;
Error in mr(map = map, reduce = reduce, combine = combine, in.folder
= if (is.list(input)) { :
hadoop streaming failed with error code 5
In addition: Warning message:
In mapreduce(input = input, output = output, input.format =
asa.csvtextinputformat, :
backend.parameters is deprecated.
*
Can someone tell me what I am missing? Thanks. Code below.
library(rmr2)
hdfs.input.path <- "/usr/cloudera/airline/data/**in"
asa.csvtextinputformat <- make.input.format(format = function(line){
values <- unlist(strsplit(line, "\\,"))
names(values) <- c("Year", "Month", "DayofMonth", "DayofWeek",
"DepTime",
"CRSDepTime", "ArrTime", "CRSArrTime", "UniqueCarrier",
"FlightNum", "TailNum", "ActualElapsedTime", "CRSElapsedTime",
"AirTime", "ArrDelay", "DepDelay", "Origin", "Dest", "Distance",
"TaxiIn", "TaxiOut", "Cancelled", "CancellationCode", "Diverted"
,
"CarrierDelay", "WeatherDelay", "NASDelay",
"SecurityDelay", "LateAircraftDelay")
return(keyval(NULL, values))
})
mapper.year.market.enroute_**time <- function(key, val){
#Skip header lines, cancellations, and diversions:
if(!identical(as.character(val**['Year']),'Year')
&identical(as.numeric(val['**Cancelled']),0)
&identical(as.numeric(val['**Diverted']),0)){
#We don't care about direction of travel, so construct 'market'
#with airports ordered alphabetically (e.g, LAX to JFK becomes
'JFK-LAX'
if(val['Origin'] < val['Dest']){
market <- paste(val['Origin'], val['Dest'], sep='-')
} else market <- paste(val['Dest'], val['Origin'], sep='-')
# key consists of year,market
output.key=c(val['Year'],marke**t)
#output gate-to-gate elapsed times(CRSandactual) + time in
airoutput.
output.val=c(val['**CRSElapsedTime'],val['**ActualElapsedTime'],
val['**AirTime'])
return(keyval(output.key,outpu**t.val))
}
}
reducer.year.market.enroute_**time = function(key, val.list){
if(require(plyr)) val.df = ldply(val.list, as.numeric) else{
#this is as close as my deficient * apply skills can come w/o
plyrval.list = lapply(val.list, as.numeric)
val.df = data.frame(do.call(rbind, val.list))
}
colnames(val.df) = c('actual','crs','air')
output.key = key
output.val = c(nrow(val.df), mean(val.df$actual,na.rm=T), mean(val
.df$crs, na.rm = T), mean(val.df$air,na.rm = T))
return(keyval(output.key, output.val))
}
mr.year.market.enroute_time = function(input,output){
mapreduce(input = input,
output = output,
input.format = asa.csvtextinputformat,
map = mapper.year.market.enroute_**time,
reduce = reducer.year.market.enroute_**time,
backend.parameters = list(hadoop= list(D=
"mapred.reduce.tasks=**10")),
verbose=T)
}
hdfs.output.root <- "/usr/cloudera/airline/data/**out"
hdfs.output.path = file.path(hdfs.output.root,'**enroute-time')
results = mr.year.market.enroute_time(hd**fs.input.path, hdfs.output
.path)
--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/**forum/rhadoop?hl=en-US<https://groups.google.com/d/forum/rhadoop?hl=en-US>
--I am trying to implement the example code for the Airline dataset
given in Jeffrey Breen's tutorial found here<http://www.slideshare.net/jeffreybreen/big-data-stepbystep-using-r-hadoop-with-rhadoops-rmr-package#btnNext>.
I haven't been successful.
In my input directory I have a csv file with lines that look like
this
*2004,3,25,4,1445,1437,1820,1812,AA,399,N275AA,215,215,
197,8,8,BOS,MIA,1258,6,12,0,,0,0,0,0,0,0*
I am using only one input file, it is a csv containing only a subset
of the true dataset -- only about 5,000 lines.
I went though the code within the formatting, map, and reduce
functions with example inputs and it all works fine.
*When I run the code I get the error;
Error in mr(map = map, reduce = reduce, combine = combine, in.folder
= if (is.list(input)) { :
hadoop streaming failed with error code 5
In addition: Warning message:
In mapreduce(input = input, output = output, input.format =
asa.csvtextinputformat, :
backend.parameters is deprecated.
*
Can someone tell me what I am missing? Thanks. Code below.
library(rmr2)
hdfs.input.path <- "/usr/cloudera/airline/data/**in"
asa.csvtextinputformat <- make.input.format(format = function(line){
values <- unlist(strsplit(line, "\\,"))
names(values) <- c("Year", "Month", "DayofMonth", "DayofWeek",
"DepTime",
"CRSDepTime", "ArrTime", "CRSArrTime", "UniqueCarrier",
"FlightNum", "TailNum", "ActualElapsedTime", "CRSElapsedTime",
"AirTime", "ArrDelay", "DepDelay", "Origin", "Dest", "Distance",
"TaxiIn", "TaxiOut", "Cancelled", "CancellationCode", "Diverted"
,
"CarrierDelay", "WeatherDelay", "NASDelay",
"SecurityDelay", "LateAircraftDelay")
return(keyval(NULL, values))
})
mapper.year.market.enroute_**time <- function(key, val){
#Skip header lines, cancellations, and diversions:
if(!identical(as.character(val**['Year']),'Year')
&identical(as.numeric(val['**Cancelled']),0)
&identical(as.numeric(val['**Diverted']),0)){
#We don't care about direction of travel, so construct 'market'
#with airports ordered alphabetically (e.g, LAX to JFK becomes
'JFK-LAX'
if(val['Origin'] < val['Dest']){
market <- paste(val['Origin'], val['Dest'], sep='-')
} else market <- paste(val['Dest'], val['Origin'], sep='-')
# key consists of year,market
output.key=c(val['Year'],marke**t)
#output gate-to-gate elapsed times(CRSandactual) + time in
airoutput.
output.val=c(val['**CRSElapsedTime'],val['**ActualElapsedTime'],
val['**AirTime'])
return(keyval(output.key,outpu**t.val))
}
}
reducer.year.market.enroute_**time = function(key, val.list){
if(require(plyr)) val.df = ldply(val.list, as.numeric) else{
#this is as close as my deficient * apply skills can come w/o
plyrval.list = lapply(val.list, as.numeric)
val.df = data.frame(do.call(rbind, val.list))
}
colnames(val.df) = c('actual','crs','air')
output.key = key
output.val = c(nrow(val.df), mean(val.df$actual,na.rm=T), mean(val
.df$crs, na.rm = T), mean(val.df$air,na.rm = T))
return(keyval(output.key, output.val))
}
mr.year.market.enroute_time = function(input,output){
mapreduce(input = input,
output = output,
input.format = asa.csvtextinputformat,
map = mapper.year.market.enroute_**time,
reduce = reducer.year.market.enroute_**time,
backend.parameters = list(hadoop= list(D=
"mapred.reduce.tasks=**10")),
verbose=T)
}
hdfs.output.root <- "/usr/cloudera/airline/data/**out"
hdfs.output.path = file.path(hdfs.output.root,'**enroute-time')
results = mr.year.market.enroute_time(hd**fs.input.path, hdfs.output
.path)
--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/**forum/rhadoop?hl=en-US<https://groups.google.com/d/forum/rhadoop?hl=en-US>
unsubscribe: rhadoop+u...@googlegroups.com <javascript:> ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups
"RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to rhadoop+u...@googlegroups.com <javascript:>.
For more options, visit https://groups.google.com/groups/opt_out.
post: rhadoop@googlegroups.com ||
unsubscribe: rhadoop+unsubscribe@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
Search Discussions
Discussion Posts
Previous
Follow ups
- Antonio Piccolboni: can you add rmr.str(vv) to this program just before the failed call, run, get stderr again and share here? Sorry it is a little cumbersome, but since I can't repro you have to be my operator (unless I can have access to your system, which may be even more complicated to do in a safe way). It is also valuable for people to see some debugging as it happens, I think. Thanks Antonio -- post: [email protected] || unsubscribe: rhadoop+ [email protected] || web
- Antonio Piccolboni: I mean from.dfs( mapreduce( to.dfs(mtcars), map = function(k, v) keyval(v[, c('carb', 'cyl')], v), reduce = function(k, vv) {rmr.str(vv); aggregate.data.frame(vv, by = vv[, c("carb", "cyl")], mean)})) -- post: [email protected] || unsubscribe: rhadoop+ [email protected] || web: https://groups.google.com/d/forum/rhadoop?hl=en-US --- You received this message because you are subscribed to the Google Groups "RHadoop" group. To unsubscribe from this group and stop receiving emails from it, send an
- Antonio Piccolboni: No need to do anything, I found the problem. Hang in there. -- post: [email protected] || unsubscribe: rhadoop+ [email protected] || web: https://groups.google.com/d/forum/rhadoop?hl=en-US --- You received this message because you are subscribed to the Google Groups "RHadoop" group. To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+unsubscribe@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.
Related Discussions
Discussion Overview
group | rhadoop |
posted | Nov 22, '12 at 7:29p |
active | Feb 27, '13 at 7:47a |
posts | 34 |
users | 8 |
No comments:
Post a Comment