How to input HDFS file into R mapreduce for processing and get the result into HDFS file


I have a question similar to the below link in stackoverflow
R+Hadoop: How to read CSV file from HDFS and execute mapreduce?
I am tring to read a file from location "/somnath/logreg_data/ds1.10.csv" in HDFS, reduce its number of columns from 10 to 5 and then write to another location "/somnath/logreg_data/reduced/ds1.10.reduced.csv" in HDFS using the below transfer.csvfile.hdfs.to.hdfs.reduced function.

transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)

The function definition is
transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        #r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = from.dfs(hdfsFilePath),
                                          input.format = "native",
                                          map = function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                print(label)
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))}
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        #to.dfs(reduced.values)
                }
But I am receiving an error
Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted
OR
When I am trying to load a file from hdfs using the below commands, I am getting the below error:
> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",  :
  attempt to apply non-function
Any help will be highly appreciated
Thanks
share|improve this question

1 Answer

up vote 0 down vote accepted
Basically found a solution to the problem that I stated above.
r.file <- hdfs.file(hdfsFilePath,"r")
from.dfs(
    mapreduce(
         input = as.matrix(hdfs.read.text.file(r.file)),
         input.format = "csv",
         map = ...
))
Below is the entire modified function:
transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        hdfs.init()
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                                                function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = as.matrix(hdfs.read.text.file(r.file)),
                                          input.format = "csv",
                                          map = function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                       function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M)) }
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        hdfs.close(r.file)
                        hdfs.close(w.file)
                        #to.dfs(reduced.values)
                }
Hope this helps and don't forget to give points if you find it useful. Thanks ahead

No comments:

Post a Comment