The following steps will help us to understand more operations with DataFrames on SparkR by analyzing a New York flights dataset:
[cloudera@quickstart ~]$ wget https://s3-us-west-2.amazonaws.com/sparkr-data/nycflights13.csv --no-check-certificate [cloudera@quickstart ~]$ hadoop fs -put nycflights13.csv flights.csv
[cloudera@quickstart ~]$ cd spark-2.0.0-bin-hadoop2.7/ [cloudera@quickstart spark-2.0.0-bin-hadoop2.7]$ bin/sparkR > install.packages("magrittr", dependencies = TRUE) > library(magrittr) > flights <- read.df("flights.csv",source="csv", header="true", inferschema="true") > flights SparkDataFrame[year:int, month:int, day:int, dep_time:int, dep_delay:int, arr_time:int, arr_delay:int, carrier:string, tailnum:string, flight:int, origin:string, dest:string, air_time:int, distance:int, hour:int, minute:int]
> cache(flights)
flights
DataFrame:> count(flights) [1] 336776
head
method, or use the showDF
method to print an SQL-like printout. If you want to print the first two rows only, use the take
method:> head(flights) or > showDF(flights) > take(flights, 2)
describe
method:> desc <- describe(flights) > collect(desc) summary year month day 1 count 336776 336776 336776 2 mean 2013.0 6.548509988835309 15.71078699194717 3 stddev 1.1368646858726556E-13 3.414457244678908 8.768607101536823 4 min 2013 1 1 5 max 2013 12 31
> filter <- filter(flights, "dep_delay > 100") > delay100 <- select(filter, c("origin", "dest", "dep_delay")) > head(delay100) origin dest dep_delay 1 LGA CLT 101 2 JFK BWI 853 3 EWR BOS 144 4 LGA IAH 134 5 EWR RIC 115 6 EWR MCO 105
carrier
in descending order:> carriers <- count(groupBy(flights, "carrier")) > head(arrange(carriers, desc(carriers$count))) carrier count 1 UA 58665 2 B6 54635 3 EV 54173 4 DL 48110 5 AA 32729 6 MQ 26397
JFK
airport. Group the flights by destination airport from JFK
, aggregate by the number of flights, and then sort by the count column. Print the first six rows:> jfk_origin <- filter(flights, flights$origin == "JFK") > jfk_dest <- agg(group_by(jfk_origin, jfk_origin$dest), count = n(jfk_origin$dest)) > head(arrange(jfk_dest, desc(jfk_dest$count))) dest count 1 LAX 11262 2 SFO 8204 3 BOS 5898 4 MCO 5464 5 SJU 4752 6 FLL 4254
> createOrReplaceTempView(flights, "flightsTable") > delayDF <- sql("SELECT dest, arr_delay FROM flightsTable") > head(delayDF) dest arr_delay 1 IAH 11 2 IAH 20 3 MIA 33 4 BQN -18 5 ATL -25 6 ORD 12
head
method:> flights$air_time_hr <- flights$air_time / 60 > flights$air_time_hr <- NULL > newDF <- mutate(flights, air_time_sec = flights$air_time * 60) > renamecolDF <- withColumnRenamed(flights, " air_time", " air_time_ren")
magrittr
:> jfk_dest <- filter(flights, flights$origin == "JFK") %>% group_by(flights$dest) %>% summarize(count = n(flights$dest)) > frqnt_dests <- head(arrange(jfk_dest, desc(jfk_dest$count))) > head(frqnt_dests) dest count 1 LAX 11262 2 SFO 8204 3 BOS 5898 4 MCO 5464 5 SJU 4752 6 FLL 4254
> barplot(frqnt_dests$count, names.arg = frqnt_dests$dest,col=rainbow(7),main="Top Flight Destinations from JFK", xlab = "Destination", ylab= "Count", beside=TRUE )
The bar plot is shown as follows:
> localdf <- collect(jfk_dest)