Manipulating a csv file. A small tutorial.
3 posts by 2 authors
| Ashok Kumar Harnal |
11/4/13
|
Here is a small tutorial on manipulating csv file.
Problem: Filter out year-wise data for stock-symbol, KOOL, from file 'stock.csv' and for this stock symbol,
calculate per-day gain/variation in stock prices. A sample of data (having column headings) is as below:
exchange,stock_symbol,date, stock_price_open,stock_price_ high,stock_price_low,stock_ price_close,stock_volume, stock_price_adj_close
NASDAQ,KINS,2010-02-08,2.90,2. 90,2.90,2.90,000,2.90
NASDAQ,KINS,2010-02-05,2.90,2. 90,2.90,2.90,700,2.90
NASDAQ,KINS,2010-02-04,2.96,2. 96,2.96,2.96,000,2.96
:
:
NASDAQ,KINS,2009-10-27,2.25,2. 25,2.25,2.25,100,2.25
NASDAQ,KINS,2009-10-26,1.84,2. 20,1.84,2.20,500,2.20
NASDAQ,KINS,2009-10-23,2.16,2. 17,2.16,2.17,900,2.17
:
:
NASDAQ,KOOL,1997-05-07,3.91,3. 91,3.56,3.56,37300,3.56
NASDAQ,KOOL,1997-05-06,3.91,3. 94,3.91,3.94,8400,3.94
NASDAQ,KOOL,1997-05-05,3.81,4. 12,3.81,3.88,188100,3.88
NASDAQ,KOOL,1997-05-02,3.88,4. 00,3.69,3.81,201000,3.81
NASDAQ,KOOL,1997-05-01,3.47,3. 88,3.47,3.88,168000,3.88
:
:
This file is stored in hadoop file system at the location /user/test/stock.csv.
Solution: This problem can be solved using hive over hadoop. But it has been done here
using RHadoop. Our 'map' function should emit year (say, 2004) as key and corresponding
to this 'year' key there will be a maximum of 365 rows of KOOL data (assuming stock market is open
on all days of the year). That is a sample of (key, value) data emitted by map function should be as:
key value
1997 NASDAQ,KOOL,1997-05-07,3.91,3. 91,3.56,3.56,37300,3.56
1997 NASDAQ,KOOL,1997-05-06,3.91,3. 94,3.91,3.94,8400,3.94
1997 NASDAQ,KOOL,1997-05-05,3.81, 4.12,3.81,3.88,188100,3.88
1997 NASDAQ,KOOL,1997-05-02,3.88, 4.00,3.69,3.81,201000,3.81
1997 NASDAQ,KOOL,1997-05-01,3.47,3. 88,3.47,3.88,168000,3.88
At the reduce stage, per reducer, we sum up 9th and 4th column from filtered data and also
find difference between them.
The map function would be, as below:
# Specify the format of input file, for use in map function
myformat<-make.input.format(" csv",sep=",")
searchsymbol<-"KOOL"
map = function(k,stockdata) {
# convert, second column of stock-symbols to vector format. Default is data.frame.
s_symbols<-as.vector(as. matrix(stockdata[ , 2] ))
# Create an empty vector to contain our filtered rows
filtered_data<-c()
# Loop over all rows in the stock.csv file
for(i in 1:length(stockdata[ ,2])) {
# For i-th row-number, compare value in vector 's_symbols' to symbol, KOOL
# If comparison is TRUE, then this row-number is of our interest. Bind it, row-wise along with earlier found rows.
if(s_symbols[i]==searchsymbol) filtered_data<-rbind( filtered_data,stockdata[i,])
}
# After all rows have been gone through, emit 'filtered_data' as value and the first
# four letters (say, 1997) of the string (say, 1997-05-07) in the third column of stock-data as key
keyval(as.numeric(substr( filtered_data[ ,3], 1, 4)),filtered_data)
}
All rows, having the same key (say, 1997) will be fed to one reducer. (This is how reducers operate).
That is different reducers have rows with differing keys. Reducer function will be as:
reduce = function(year,symbol_data) {
# How many rows for every year
noOfRowsperyear<-length( symbol_data[,9])
# Sum all data in 9th column after its conversion to vector, and then numeric
stock_price_adj_close<-as. vector(as.matrix(symbol_data[, 9])) ;
# Sum all data in 4th column after its conversion to vector, and then numeric
stock_price_open<-as.vector( as.matrix(symbol_data[,4])) ;
yearwiseSum_ stockPriceAdjClose<-sum(as. numeric(stock_price_adj_close) )
yearwiseSum_stockPriceOpen<- sum( as.numeric(stock_price_open))
# Prepare the 'value' to be emitted after combining them in a vector form
output<-cbind(noOfRowsperyear, yearwiseSum_stockPriceOpen, yearwiseSum_ stockPriceAdjClose)
# Year wise results
keyval(year,output)
}
#Run the mapreduce now:
year_wisedata<-mapreduce( input= '/user/test/stock.csv', map = map, reduce = reduce, input.format=myformat)
# Get the result from hdfs
result<-from.dfs(year_ wisedata)
# Display result
result$val
result$key
# Combine result to plot
xy<-cbind(result$key,result$ val)
# Plot it now
plot(xy[ ,1] , xy[ ,4]- xy[ ,3] )
A printout of result, xy, is as:
noOfRowsperyear yearwiseSum_stockPriceOpen
[1,] 1995 92 89.14
[2,] 1996 254 723.25
[3,] 1997 252 844.34
[4,] 1998 250 540.37
yearwiseSum_stockPriceAdjClose
[1,] 176.40
[2,] 889.54
[3,] 843.54
[4,] 537.19
Me, too!
Click here to Reply
| Antonio Piccolboni |
11/4/13
|
Hi Ashok,
See? 2X data, 4X time. This approach is not going very far, almost a minute for 10000 rows! In your case the data frames involved may be small enough that it doesn't matter, but I thought it would be helpful to point out.
thanks
for this example. A few comments: since you are extracting a small
amount of data that can be processed in a single reducer, this may not
be the ideal use case for mapreduce. You could as well query the data
with, say, hive or impala and continue the computation in memory. But if
you just grouped by stock symbol instead of filtering a single one,
that alone would make it a better match for mapreduce.
Grouping
by year as you did serves no purpose other than preventing you from
computing the change in price between the last day of one year and the
first of the next and because of that your program is incorrect,
strictly speaking. You could solve this problem as map-only filter and
then compute the changes in the master R instance after the from.dfs. It
seems like you absolutely wanted to specify a key, but the year is
counter productive. For some people it helps to think of it in SQL
terms: would you group by year if all you wanted where day by day
changes? No, you would group by symbol. If one time series alone is too
big, then you need a more complex grouping by time intervals with
overlaps. One possible implementation of that is provided in the new
package plyrmr, function moving.window.
Now talking about R
code, independent of Hadoop, that for loop on the rows, unfortunately
that's an R anti-pattern, and the reason is simple: loops are slow (for
loop, apply family calls, it doesn't matter). You always need to convert
the inner loop into a vectorized call to the the C library, or write a C
extension if necessary, and it's not as hard as it seems. The potential
speed up is 100X compared to a naive for loop. The other thing,
accruing a data frame one row at a time is also not an option, its
complexity is proportional to the square of the number of rows added.
> system.time({z = data.frame(); for (i in 1:5000) {z = rbind(z, mtcars[sample(1:32, 1),])}})
user system elapsed
10.351 1.150 11.519
> system.time({z = data.frame(); for (i in 1:10000) {z = rbind(z, mtcars[sample(1:32, 1),])}})
user system elapsed
41.123 4.943 46.081
See? 2X data, 4X time. This approach is not going very far, almost a minute for 10000 rows! In your case the data frames involved may be small enough that it doesn't matter, but I thought it would be helpful to point out.
Finally, a little advertisement for the new package plyrmr. Let's modify your input format to have meaningful com names
myfmt = make.input.format("csv", sep = ",", col.names = c("exchange","stock_symbol"," date","stock_price_open"," stock_price_high","stock_ price_low","stock_price_close" ,"stock_volume","stock_price_ adj_close"))
as.data.frame(where(input( '/ user/test/stock.csv', format = myfmt), stock_symbol == "KOOL"))
would
filter the data for KOOL and return it at a data frame with at most a
few thousands rows. If you wanted to do all symbols in one go and
compute variations as you set out to do originally instead of sums as
you actually ended up doing :
do(
group(
input ('/user/test/stock.csv', format = myfmt),
stock_symbol),
function(x)
apply(
arrange(x, date),
2,
diff))
I don't have data or time to test this out, so small corrections may be needed.
Antonio
- show quoted text -
1
0
| Ashok Kumar Harnal |
11/6/13
|
Thanks for your time and effort.
I have noted your comments " for loop on the rows, unfortunately that's an R anti-pattern"
and there is no need for multiple reducers as data size is not very
large. I have not yet installed plyrmr. But now the program has been
changed to use ddply() so as to avoid loops.
The modifications are now as follows:
#
We split stock data (data frame) first over stock_symbol, then by year
(say 1998, not by date). This is done by using ddply() function of plyr
package. ddply() takes out subsets of a data frame by specified
variables, and for each subset applies
# specified function(s) and then combines results into a data frame. This combined data frame is returned as output.
# To our stock data, we apply sum function over opening and closing (adjusted) prices. Summation is done annual-year wise.
myformat<-make.input.format(" csv", sep=",")
# Define two functions to apply them over columns of split data:
closingSum<- function(x) { sum(x[, 9])} # Sum up 9th column (adjusted closing price)
openingSum<- function(x) { sum(x[, 4])} # Sum up 4th column (opening price)
map = function(k,stockdata) {
year = as.numeric(substr(stockdata[, 3],1,4))
# Split stockdata, first by 2nd column (ie symbol), then by year.
Then, for each symbol and for each year (for this symbol) take out
sums.
yearwise_sum_data<- ddply(stockdata, .(stockdata[, 2], year), c("closingSum","openingSum") )
keyval(k, yearwise_sum_data)
}
# Apply mapreduce now
result<-mapreduce(input = "/user/test/stock.csv", map = map , input.format=myformat)
# Read the result from hadoop file system using from.dfs()
ret<-from.dfs(result)
# We get arranged results as below (ret$val) for all stocks (one by one).
stockdata[, 2] year closingSum openingSum
X1 KENT 1993 134.75 586.06
X2 KENT 1994 108.64 473.45
X3 KENT 1995 142.98 622.10
X4 KENT 1996 158.97 699.90
X5 KENT 1997 132.59 578.82
X6 KENT 1998 177.22 654.00
:
:
X42.11 KTEC 2005 2923.56 2911.06
X43.11 KTEC 2006 3149.95 3146.13
X44.10 KTEC 2007 5895.26 5873.61
X45.9 KTEC 2008 7095.27 7107.13
X46.7 KTEC 2009 2867.09 2865.51
X47.3 KTEC 2010 339.31 335.90
:
:
# I can now plot the above results, by grouping on stock symbol, as below. Function xyplot() of lattice package is used.
require(lattice)
yaxis<-ret$val[, 3] - ret$val[, 4] # Per year gain (absolute, maybe percentage would have been better)
xaxis<-ret$val[, 2] # Year
groupby<-ret$val[, 1] # Each plot per symbol
xyplot(yaxis ~ xaxis | groupby, ylab="Per year gain", xlab="Year", scales="free")
# As 'for-loops' have been avoided, this process is much faster than the earlier one.
Thanks again and with regards,
Dear Sir
ReplyDeleteI want to know the command to run Ubuntu 10.4 64 bits + RHadoop command to do this job.
I have a file. Csv 2 files.
I want to know the command to run Ubuntu 10.4 64 bits + RHadoop command to do this job.
I have 2 file. Csv import data set to Ubuntu 10.4 64 bits + RHadoop already but .What command I used for my traget below .You can just sample command I will do go to next step basicaly command because I am new Linux
1.The first file have separate data traffic volume per cluster
-Need to know what area high data traffic volume
-Predict that tend to be in next one year per cluster.
2.The second file have including the data traffic volume
-Predict that tend to be in next one year.
Please to be advisor to command to used it.
Thank you
Please to be advisor to command to used it.
Thank you