Manipulating a csv file. A small tutorial.

Manipulating a csv file. A small tutorial.
3 posts by 2 authors
Ashok Kumar Harnal
11/4/13
Here is a small tutorial on manipulating csv file.

Problem: Filter out year-wise data for stock-symbol, KOOL, from file 'stock.csv' and for this stock symbol, 
calculate per-day gain/variation in stock prices. A sample of data (having column headings) is as below:

exchange,stock_symbol,date,stock_price_open,stock_price_high,stock_price_low,stock_price_close,stock_volume,stock_price_adj_close
NASDAQ,KINS,2010-02-08,2.90,2.90,2.90,2.90,000,2.90
NASDAQ,KINS,2010-02-05,2.90,2.90,2.90,2.90,700,2.90
NASDAQ,KINS,2010-02-04,2.96,2.96,2.96,2.96,000,2.96
  :
  :
NASDAQ,KINS,2009-10-27,2.25,2.25,2.25,2.25,100,2.25
NASDAQ,KINS,2009-10-26,1.84,2.20,1.84,2.20,500,2.20
NASDAQ,KINS,2009-10-23,2.16,2.17,2.16,2.17,900,2.17
  :
  :
NASDAQ,KOOL,1997-05-07,3.91,3.91,3.56,3.56,37300,3.56
NASDAQ,KOOL,1997-05-06,3.91,3.94,3.91,3.94,8400,3.94
NASDAQ,KOOL,1997-05-05,3.81,4.12,3.81,3.88,188100,3.88
NASDAQ,KOOL,1997-05-02,3.88,4.00,3.69,3.81,201000,3.81
NASDAQ,KOOL,1997-05-01,3.47,3.88,3.47,3.88,168000,3.88
  :
  :
This file is stored in hadoop file system at the location /user/test/stock.csv.

Solution: This problem can be solved using hive over hadoop. But it has been done here 
using RHadoop. Our 'map' function should emit year (say, 2004) as key and corresponding
to this 'year' key there will be a maximum of 365 rows of KOOL data (assuming stock market is open
on all days of the year). That is a sample of (key, value) data emitted by map function should be as:

key         value
1997  NASDAQ,KOOL,1997-05-07,3.91,3.91,3.56,3.56,37300,3.56
1997  NASDAQ,KOOL,1997-05-06,3.91,3.94,3.91,3.94,8400,3.94
1997         NASDAQ,KOOL,1997-05-05,3.81,4.12,3.81,3.88,188100,3.88
1997         NASDAQ,KOOL,1997-05-02,3.88,4.00,3.69,3.81,201000,3.81
1997   NASDAQ,KOOL,1997-05-01,3.47,3.88,3.47,3.88,168000,3.88

At the reduce stage, per reducer, we sum up 9th and 4th column from filtered data and also
find difference between them.

The map function would be, as below:

# Specify the format of input file, for use in map function
myformat<-make.input.format("csv",sep=",")
searchsymbol<-"KOOL"

map = function(k,stockdata) {
# convert, second column of stock-symbols to vector format. Default is data.frame.
s_symbols<-as.vector(as.matrix(stockdata[ , 2] ))
# Create an empty vector to contain our filtered rows
filtered_data<-c()
# Loop over all rows in the stock.csv file
for(i in 1:length(stockdata[ ,2])) {
# For i-th row-number, compare value in vector 's_symbols' to symbol, KOOL
#   If comparison is TRUE, then this row-number is of our interest. Bind it, row-wise along with earlier found rows.
if(s_symbols[i]==searchsymbol)  filtered_data<-rbind(filtered_data,stockdata[i,])
# After all rows have been gone through, emit 'filtered_data' as value and the first
#  four letters (say, 1997) of the string (say, 1997-05-07) in the third column of stock-data as key 
keyval(as.numeric(substr(filtered_data[ ,3], 1, 4)),filtered_data)
}

All rows, having the same key (say, 1997) will be fed to one reducer. (This is how reducers operate).
That is different reducers have rows with differing keys. Reducer function will be as:

reduce = function(year,symbol_data) {
# How many rows for every year
noOfRowsperyear<-length(symbol_data[,9])
# Sum all data in 9th column after its conversion to vector, and then numeric
stock_price_adj_close<-as.vector(as.matrix(symbol_data[,9])) ;
# Sum all data in 4th column after its conversion to vector, and then numeric
stock_price_open<-as.vector(as.matrix(symbol_data[,4])) ;
yearwiseSum_stockPriceAdjClose<-sum(as.numeric(stock_price_adj_close))
yearwiseSum_stockPriceOpen<-sum( as.numeric(stock_price_open))
# Prepare the 'value' to be emitted after combining them in a vector form
output<-cbind(noOfRowsperyear,yearwiseSum_stockPriceOpen,yearwiseSum_stockPriceAdjClose) 
# Year wise results 
keyval(year,output)
}

#Run the mapreduce now:
year_wisedata<-mapreduce(input= '/user/test/stock.csv', map = map, reduce = reduce, input.format=myformat)
# Get the result from hdfs
result<-from.dfs(year_wisedata)
# Display result
result$val
result$key
# Combine result to plot
xy<-cbind(result$key,result$val)
# Plot it now
plot(xy[ ,1] , xy[ ,4]- xy[ ,3] )

A printout of result, xy, is as:
           noOfRowsperyear yearwiseSum_stockPriceOpen    
 [1,] 1995              92                      89.14
 [2,] 1996             254                     723.25
 [3,] 1997             252                     844.34
 [4,] 1998             250                     540.37
 
      yearwiseSum_stockPriceAdjClose
 [1,]                         176.40
 [2,]                         889.54
 [3,]                         843.54
 [4,]                         537.19




Me, too!
Click here to Reply
Antonio Piccolboni
11/4/13
Hi Ashok, 
thanks for this example. A few comments: since you are extracting a small amount of data that can be processed in a single reducer, this may not be the ideal use case for mapreduce. You could as well query the data with, say, hive or impala and continue the computation in memory. But if you just grouped by stock symbol instead of filtering a single one, that alone would make it a better match for mapreduce. 
Grouping by year as you did serves no purpose other than preventing you from computing the change in price between the last day of one year and the first of the next and because of that your program is incorrect, strictly speaking. You could solve this problem as map-only filter and then compute the changes in the master R instance after the from.dfs. It seems like you absolutely wanted to specify a key, but the year is counter productive. For some people it helps to think of it in SQL terms: would you group by year if all you wanted where day by day changes? No, you would group by symbol. If one time series alone is too big, then you need a more complex grouping by time intervals with overlaps. One possible implementation of that is provided in the new package plyrmr, function moving.window.
Now talking about R code, independent of Hadoop, that for loop on the rows, unfortunately that's an R anti-pattern, and the reason is simple: loops are slow (for loop, apply family calls, it doesn't matter). You always need to convert the inner loop into a vectorized call to the the C library, or write a C extension if necessary, and it's not as hard as it seems. The potential speed up is 100X compared to a naive for loop. The other thing, accruing a data frame one row at a time is also not an option, its complexity is proportional to the square of the number of rows added. 

> system.time({z = data.frame(); for (i in 1:5000) {z = rbind(z, mtcars[sample(1:32, 1),])}})
   user  system elapsed 
 10.351   1.150  11.519 
> system.time({z = data.frame(); for (i in 1:10000) {z = rbind(z, mtcars[sample(1:32, 1),])}})
   user  system elapsed 
 41.123   4.943  46.081 


See? 2X data, 4X time. This approach is not going very far, almost a minute for 10000 rows! In your case the data frames involved may be small enough that it doesn't matter, but I thought it would be helpful to point out.
Finally, a little advertisement for the new package plyrmr. Let's modify your input format to have meaningful com names

myfmt = make.input.format("csv", sep = ",", col.names = c("exchange","stock_symbol","date","stock_price_open","stock_price_high","stock_price_low","stock_price_close","stock_volume","stock_price_adj_close"))

as.data.frame(where(input( '/user/test/stock.csv', format = myfmt), stock_symbol == "KOOL"))

would filter the data for KOOL and return it at a data frame with at most a few thousands rows. If you wanted to do all symbols in one go and compute variations as you set out to do originally instead of sums as you actually ended up doing :

do(
  group(
    input ('/user/test/stock.csv', format = myfmt), 
    stock_symbol), 
  function(x) 
    apply(
      arrange(x, date), 
      2, 
      diff))

I don't have data or time to test this out, so small corrections may be needed.


Antonio
- show quoted text -
1
0
Ashok Kumar Harnal
11/6/13
Thanks for your time and effort.

I have noted your comments " for loop on the rows, unfortunately that's an R anti-pattern" and there is no need for multiple reducers as data size is not very large. I have not yet installed plyrmr. But now the program has been changed to use ddply() so as to avoid loops.
The modifications are now as follows:

# We split stock data (data frame) first over stock_symbol, then by year (say 1998, not by date). This is done by using ddply() function of plyr package. ddply() takes out subsets of a data frame by specified variables, and for each subset applies
# specified function(s) and then combines results into a data frame. This combined data frame is returned as output.
# To our stock data, we apply sum function over opening and closing (adjusted) prices. Summation is done annual-year wise. 

myformat<-make.input.format("csv", sep=",")
# Define two functions to apply them over columns of split data:
closingSum<- function(x) { sum(x[, 9])}         # Sum up 9th column (adjusted closing price)
openingSum<- function(x) { sum(x[, 4])}        # Sum up 4th column (opening price)

map = function(k,stockdata) {
year = as.numeric(substr(stockdata[,3],1,4))
        # Split stockdata, first by 2nd column (ie symbol), then by year. Then, for each symbol and for each year (for this symbol) take out sums.
yearwise_sum_data<- ddply(stockdata, .(stockdata[, 2], year), c("closingSum","openingSum") )
keyval(k, yearwise_sum_data)
}

# Apply mapreduce now
result<-mapreduce(input = "/user/test/stock.csv", map = map , input.format=myformat)
# Read the result from hadoop file system using from.dfs()
ret<-from.dfs(result)

# We get arranged results as below (ret$val) for all stocks (one by one).

   stockdata[, 2] year closingSum openingSum
X1           KENT 1993     134.75     586.06
X2           KENT 1994     108.64     473.45
X3           KENT 1995     142.98     622.10
X4           KENT 1996     158.97     699.90
X5           KENT 1997     132.59     578.82
X6           KENT 1998     177.22     654.00
  :
  :
X42.11       KTEC 2005    2923.56    2911.06
X43.11       KTEC 2006    3149.95    3146.13
X44.10       KTEC 2007    5895.26    5873.61
X45.9        KTEC 2008    7095.27    7107.13
X46.7        KTEC 2009    2867.09    2865.51
X47.3        KTEC 2010     339.31     335.90
  :
  :

# I can now plot the above results, by grouping on stock symbol, as below.  Function xyplot() of lattice package is used.

require(lattice)
yaxis<-ret$val[, 3] - ret$val[, 4]           # Per year gain (absolute, maybe percentage would have been better)
xaxis<-ret$val[, 2]                             # Year
groupby<-ret$val[, 1]                         # Each plot per symbol
xyplot(yaxis ~ xaxis | groupby, ylab="Per year gain", xlab="Year", scales="free")

# As 'for-loops' have been avoided, this process is much faster than the earlier one.

Thanks again and with regards,

1 comment:

  1. Dear Sir
    I want to know the command to run Ubuntu 10.4 64 bits + RHadoop command to do this job.
    I have a file. Csv 2 files.
    I want to know the command to run Ubuntu 10.4 64 bits + RHadoop command to do this job.

    I have 2 file. Csv import data set to Ubuntu 10.4 64 bits + RHadoop already but .What command I used for my traget below .You can just sample command I will do go to next step basicaly command because I am new Linux

    1.The first file have separate data traffic volume per cluster

    -Need to know what area high data traffic volume

    -Predict that tend to be in next one year per cluster.

    2.The second file have including the data traffic volume

    -Predict that tend to be in next one year.

    Please to be advisor to command to used it.

    Thank you
    Please to be advisor to command to used it.
    Thank you

    ReplyDelete