Using DataFrames with SparkR

The following steps will help us to understand more operations with DataFrames on SparkR by analyzing a New York flights dataset:

  1. As a first step, let's download the flights data and copy it to HDFS:
    [cloudera@quickstart ~]$ wget https://s3-us-west-2.amazonaws.com/sparkr-data/nycflights13.csv --no-check-certificate
    
    [cloudera@quickstart ~]$ hadoop fs -put nycflights13.csv flights.csv
    
  2. Start the SparkR shell and create a DataFrame using the CSV DataSource. While installing packages, use HTTP locations near you:
    [cloudera@quickstart ~]$ cd spark-2.0.0-bin-hadoop2.7/
    [cloudera@quickstart spark-2.0.0-bin-hadoop2.7]$ bin/sparkR
    
    > install.packages("magrittr", dependencies = TRUE)
    > library(magrittr)
    
    > flights <- read.df("flights.csv",source="csv", header="true", inferschema="true")
    
    > flights
    
    SparkDataFrame[year:int, month:int, day:int, dep_time:int, dep_delay:int, arr_time:int, arr_delay:int, carrier:string, tailnum:string, flight:int, origin:string, dest:string, air_time:int, distance:int, hour:int, minute:int]
    
  3. Cache the DataFrame in memory using the following command:
    > cache(flights)
    
  4. Count the number of rows in the flights DataFrame:
    > count(flights)
    [1] 336776
    
  5. Print the first six rows from the DataFrame using the head method, or use the showDF method to print an SQL-like printout. If you want to print the first two rows only, use the take method:
    > head(flights) or 
    > showDF(flights)
    > take(flights, 2)
    
  6. Print the statistics of each column using the describe method:
    > desc <- describe(flights)
    > collect(desc)
    
      summary                   year             month               day
    1   count                 336776            336776            336776
    2    mean                 2013.0 6.548509988835309 15.71078699194717
    3  stddev 1.1368646858726556E-13 3.414457244678908 8.768607101536823
    4     min                   2013                 1                 1
    5     max                   2013                12                31
    
  7. Filter the columns on a specific condition, and display only the selected columns:
    > filter <- filter(flights, "dep_delay > 100")
    > delay100 <- select(filter, c("origin", "dest", "dep_delay"))
    > head(delay100)
      origin dest dep_delay
    1    LGA  CLT       101
    2    JFK  BWI       853
    3    EWR  BOS       144
    4    LGA  IAH       134
    5    EWR  RIC       115
    6    EWR  MCO       105
    
  8. Print the number of records grouped by carrier in descending order:
    > carriers <- count(groupBy(flights, "carrier"))
    > head(arrange(carriers, desc(carriers$count)))
      carrier count
    1      UA 58665
    2      B6 54635
    3      EV 54173
    4      DL 48110
    5      AA 32729
    6      MQ 26397
    
  9. Run a query to print the topmost frequent destinations from JFK airport. Group the flights by destination airport from JFK, aggregate by the number of flights, and then sort by the count column. Print the first six rows:
    > jfk_origin <- filter(flights, flights$origin == "JFK")
    
    > jfk_dest <- agg(group_by(jfk_origin, jfk_origin$dest), count = n(jfk_origin$dest))
    
    > head(arrange(jfk_dest, desc(jfk_dest$count)))
    
      dest count
    1  LAX 11262
    2  SFO  8204
    3  BOS  5898
    4  MCO  5464
    5  SJU  4752
    6  FLL  4254
    
  10. Now, register the DataFrame as a temporary table, and query it using SQL:
    > createOrReplaceTempView(flights, "flightsTable")
    > delayDF <- sql("SELECT dest, arr_delay FROM flightsTable")
    
    > head(delayDF)
      dest arr_delay
    1  IAH        11
    2  IAH        20
    3  MIA        33
    4  BQN       -18
    5  ATL       -25
    6  ORD        12
    
  11. Use the following commands to create new columns, delete columns, and rename columns. Check the data before and after deleting the column using the head method:
    > flights$air_time_hr <- flights$air_time / 60
    > flights$air_time_hr <- NULL
    > newDF <- mutate(flights, air_time_sec = flights$air_time * 60)
    > renamecolDF <- withColumnRenamed(flights, " air_time", " air_time_ren")
    
  12. Combine the whole query into two lines using magrittr:
    > jfk_dest <- filter(flights, flights$origin == "JFK") %>% 
      group_by(flights$dest) %>% 
      summarize(count = n(flights$dest))
    
    > frqnt_dests <- head(arrange(jfk_dest, desc(jfk_dest$count)))
    
    > head(frqnt_dests)
      dest count
    1  LAX 11262
    2  SFO  8204
    3  BOS  5898
    4  MCO  5464
    5  SJU  4752
    6  FLL  4254
    
  13. Finally, create a bar plot of frequent destinations. If you are executing this command in a Putty shell, the graph will not be displayed. To display the graph, you need to use a shell session within the VM:
    > barplot(frqnt_dests$count, names.arg = frqnt_dests$dest,col=rainbow(7),main="Top Flight Destinations from JFK", xlab = "Destination", ylab= "Count", beside=TRUE )
    

    The bar plot is shown as follows:

    Using DataFrames with SparkR

    Figure 10.8: Bar plot screenshot from SparkR

  14. Though it's not recommended, if you want to convert the SparkR DataFrame to an R local DataFrame, use the following command:
    > localdf <- collect(jfk_dest)
    
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset