Chapter 8. Integrating Scala with R and Python

While Spark provides MLlib as a library for machine learning, in many practical situations, R or Python present a more familiar and time-tested interface for statistical computations. In particular, R's extensive statistical library includes very popular ANOVA/MANOVA methods of analyzing variance and variable dependencies/independencies, sets of statistical tests, and random number generators that are not currently present in MLlib. The interface from R to Spark is available under SparkR project. Finally, data analysts know Python's NumPy and SciPy linear algebra implementations for their efficiency as well as other time-series, optimization, and signal processing packages. With R/Python integration, all these familiar functionalities can be exposed to Scala/Spark users until the Spark/MLlib interfaces stabilize and the libraries make their way into the new framework while benefiting the users with Spark's ability to execute workflows in a distributed way across multiple machines.

When people program in R or Python, or with any statistical or linear algebra packages for this matter, they are usually not specifically focusing on the functional programming aspects. As I mentioned in Chapter 1, Exploratory Data Analysis, Scala should be treated as a high-level language and this is where it shines. Integration with highly efficient C and Fortran implementations, for example, of the freely available Basic Linear Algebra Subprograms (BLAS), Linear Algebra Package (LAPACK), and Arnoldi Package (ARPACK), is known to find its way into Java and thus Scala (http://www.netlib.org, https://github.com/fommil/netlib-java). I would like to leave Scala at what it's doing best. In this chapter, however, I will focus on how to use these languages with Scala/Spark.

I will use the publicly available United States Department of Transportation flights dataset for this chapter (http://www.transtats.bts.gov).

In this chapter, we will cover the following topics:

  • Installing R and configuring SparkR if you haven't done so yet
  • Learning about R (and Spark) DataFrames
  • Performing linear regression and ANOVA analysis with R
  • Performing Generalized Linear Model (GLM) modeling with SparkR
  • Installing Python if you haven't done so yet
  • Learning how to use PySpark and call Python from Scala

Integrating with R

As with many advanced and carefully designed technologies, people usually either love or hate R as a language. One of the reason being that R was one of the first language implementations that tries to manipulate complex objects, even though most of them turn out to be just a list as opposed to struct or map as in more mature modern implementations. R was originally created at the University of Auckland by Ross Ihaka and Robert Gentleman around 1993, and had its roots in the S language developed at Bell Labs around 1976, when most of the commercial programming was still done in Fortran. While R incorporates some functional features such as passing functions as a parameter and map/apply, it conspicuously misses some others such as lazy evaluation and list comprehensions. With all this said, R has a very good help system, and if someone says that they never had to go back to the help(…) command to figure out how to run a certain data transformation or model better, they are either lying or just starting in R.

Setting up R and SparkR

To run SparkR, you'll need R version 3.0 or later. Follow the given instructions for the installation, depending on you operating system.

Linux

On a Linux system, detailed installation documentation is available at https://cran.r-project.org/bin/linux. However, for example, on a Debian system, one installs it by running the following command:

# apt-get update
...
# apt-get install r-base r-base-dev
...

To list installed/available packages on the Linux repository site, perform the following command:

# apt-cache search "^r-.*" | sort
...

R packages, which are a part of r-base and r-recommended, are installed into the /usr/lib/R/library directory. These can be updated using the usual package maintenance tools such as apt-get or aptitude. The other R packages available as precompiled Debian packages, r-cran-* and r-bioc-*, are installed into /usr/lib/R/site-library. The following command shows all packages that depend on r-base-core:

# apt-cache rdepends r-base-core

This comprises of a large number of contributed packages from CRAN and other repositories. If you want to install R packages that are not provided as package, or if you want to use newer versions, you need to build them from source that requires the r-base-dev development package that can be installed by the following command:

# apt-get install r-base-dev

This pulls in the basic requirements to compile R packages, such as the development tools group install. R packages may then be installed by the local user/admin from the CRAN source packages, typically from inside R using the R> install.packages() function or R CMD INSTALL. For example, to install the R ggplot2 package, run the following command:

> install.packages("ggplot2")
--- Please select a CRAN mirror for use in this session ---
also installing the dependencies 'stringi', 'magrittr', 'colorspace', 'Rcpp', 'stringr', 'RColorBrewer', 'dichromat', 'munsell', 'labeling', 'digest', 'gtable', 'plyr', 'reshape2', 'scales'

This will download and optionally compile the package and its dependencies from one of the available sites. Sometime R is confused about the repositories; in this case, I recommend creating a ~/.Rprofile file in the home directory pointing to the closest CRAN repository:

$ cat >> ~/.Rprofile << EOF
r = getOption("repos") # hard code the Berkeley repo for CRAN
r["CRAN"] = "http://cran.cnr.berkeley.edu"
options(repos = r)
rm(r)

EOF

~/.Rprofile contains commands to customize your sessions. One of the commands I recommend to put in there is options (prompt="R> ") to be able to distinguish the shell you are working in by the prompt, following the tradition of most tools in this book. The list of known mirrors is available at https://cran.r-project.org/mirrors.html.

Also, it is good practice to specify the directory to install system/site/user packages via the following command, unless your OS setup does it already by putting these commands into ~/.bashrc or system /etc/profile:

$ export R_LIBS_SITE=${R_LIBS_SITE:-/usr/local/lib/R/site-library:/usr/lib/R/site-library:/usr/lib/R/library}
$ export R_LIBS_USER=${R_LIBS_USER:-$HOME/R/$(uname -i)-library/$( R --version | grep -o -E [0-9]+.[
0-9]+ | head -1)}

Mac OS

R for Mac OS can be downloaded, for example, from http://cran.r-project.org/bin/macosx. The latest version at the time of the writing is 3.2.3. Always check the consistency of the downloaded package. To do so, run the following command:

$ pkgutil --check-signature R-3.2.3.pkg
Package "R-3.2.3.pkg":
   Status: signed by a certificate trusted by Mac OS X
   Certificate Chain:
    1. Developer ID Installer: Simon Urbanek
       SHA1 fingerprint: B7 EB 39 5E 03 CF 1E 20 D1 A6 2E 9F D3 17 90 26 D8 D6 3B EF
       -----------------------------------------------------------------------------
    2. Developer ID Certification Authority
       SHA1 fingerprint: 3B 16 6C 3B 7D C4 B7 51 C9 FE 2A FA B9 13 56 41 E3 88 E1 86
       -----------------------------------------------------------------------------
    3. Apple Root CA
       SHA1 fingerprint: 61 1E 5B 66 2C 59 3A 08 FF 58 D1 4A E2 24 52 D1 98 DF 6C 60

The environment settings in the preceding subsection also apply to the Mac OS setup.

Windows

R for Windows can be downloaded from https://cran.r-project.org/bin/windows/ as an exe installer. Run this executable as an administrator to install R.

One can usually edit the environment setting for System/User by following the Control Panel | System and Security | System | Advanced system settings | Environment Variables path from the Windows menu.

Running SparkR via scripts

To run SparkR, one needs to run install the R/install-dev.sh script that comes with the Spark git tree. In fact, one only needs the shell script and the content of the R/pkg directory, which is not always included with the compiled Spark distributions:

$ git clone https://github.com/apache/spark.git
Cloning into 'spark'...
remote: Counting objects: 301864, done.
...
$ cp –r R/{install-dev.sh,pkg) $SPARK_HOME/R
...
$ cd $SPARK_HOME
$ ./R/install-dev.sh
* installing *source* package 'SparkR' ...
** R
** inst
** preparing package for lazy loading
Creating a new generic function for 'colnames' in package 'SparkR'
...
$ bin/sparkR

R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Launching java with spark-submit command /home/alex/spark-1.6.1-bin-hadoop2.6/bin/spark-submit   "sparkr-shell" /tmp/RtmpgdTfmU/backend_port22446d0391e8 

 Welcome to
    ____              __ 
   / __/__  ___ _____/ /__ 
  _ / _ / _ `/ __/  '_/ 
 /___/ .__/\_,_/_/ /_/\_   version  1.6.1 
    /_/ 


 Spark context is available as sc, SQL context is available as sqlContext>

Running Spark via R's command line

Alternatively, we can also initialize Spark from the R command line directly (or from RStudio at http://rstudio.org/) using the following commands:

R> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
...
R> sc <- sparkR.init(master = Sys.getenv("SPARK_MASTER"), sparkEnvir = list(spark.driver.memory="1g"))
...
R> sqlContext <- sparkRSQL.init(sc)

As described previously in Chapter 3, Working with Spark and MLlib, the SPARK_HOME environment variable needs to point to your local Spark installation directory and SPARK_MASTER and YARN_CONF_DIR to the desired cluster manager (local, standalone, mesos, and YARN) and YARN configuration directory if one is using Spark with the YARN cluster manager.

Although most all of the distributions come with a UI, in the tradition of this book and for the purpose of this chapter I'll use the command line.

DataFrames

The DataFrames originally came from R and Python, so it is natural to see them in SparkR.

Note

Please note that the implementation of DataFrames in SparkR is on top of RDDs, so they work differently than the R DataFrames.

The question of when and where to store and apply the schema and other metadata like types has been a topic of active debate recently. On one hand, providing the schema early with the data enables thorough data validation and potentially optimization. On the other hand, it may be too restrictive for the original data ingest, whose goal is just to capture as much data as possible and perform data formatting/cleansing later on, the approach often referred as schema on read. The latter approach recently won more ground with the tools to work with evolving schemas such as Avro and automatic schema discovery tools, but for the purpose of this chapter, I'll assume that we have done the schema discovery part and can start working with a DataFrames.

Let's first download and extract a flight delay dataset from the United States Department of Transportation, as follows:

$ wget http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_2015_7.zip
--2016-01-23 15:40:02--  http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_2015_7.zip
Resolving www.transtats.bts.gov... 204.68.194.70
Connecting to www.transtats.bts.gov|204.68.194.70|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 26204213 (25M) [application/x-zip-compressed]
Saving to: "On_Time_On_Time_Performance_2015_7.zip"

100%[====================================================================================================================================================================================>] 26,204,213   966K/s   in 27s     

2016-01-23 15:40:29 (956 KB/s) - "On_Time_On_Time_Performance_2015_7.zip" saved [26204213/26204213]

$ unzip -d flights On_Time_On_Time_Performance_2015_7.zip
Archive:  On_Time_On_Time_Performance_2015_7.zip
  inflating: flights/On_Time_On_Time_Performance_2015_7.csv  
  inflating: flights/readme.html

If you have Spark running on the cluster, you want to copy the file in HDFS:

$ hadoop fs –put flights .

The flights/readme.html files gives you detailed metadata information, as shown in the following image:

DataFrames

Figure 08-1: Metadata provided with the On-Time Performance dataset released by the US Department of Transportation (for demo purposes only)

Now, I want you to analyze the delays of SFO returning flights and possibly find the factors contributing to the delay. Let's start with the R data.frame:

$ bin/sparkR --master local[8]

R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-apple-darwin13.4.0 (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

[Previously saved workspace restored]

Launching java with spark-submit command /Users/akozlov/spark-1.6.1-bin-hadoop2.6/bin/spark-submit   "--master" "local[8]" "sparkr-shell" /var/folders/p1/y7ygx_4507q34vhd60q115p80000gn/T//RtmpD42eTz/backend_port682e58e2c5db 

 Welcome to
    ____              __ 
   / __/__  ___ _____/ /__ 
  _ / _ / _ `/ __/  '_/ 
 /___/ .__/\_,_/_/ /_/\_   version  1.6.1 
    /_/ 


 Spark context is available as sc, SQL context is available as sqlContext
> flights <- read.table(unz("On_Time_On_Time_Performance_2015_7.zip", "On_Time_On_Time_Performance_2015_7.csv"), nrows=1000000, header=T, quote=""", sep=",")
> sfoFlights <- flights[flights$Dest == "SFO", ]
> attach(sfoFlights)
> delays <- aggregate(ArrDelayMinutes ~ DayOfWeek + Origin + UniqueCarrier, FUN=mean, na.rm=TRUE)
> tail(delays[order(delays$ArrDelayMinutes), ])
    DayOfWeek Origin UniqueCarrier ArrDelayMinutes
220         4    ABQ            OO           67.60
489         4    TUS            OO           71.80
186         5    IAH            F9           77.60
696         3    RNO            UA           79.50
491         6    TUS            OO          168.25
84          7    SLC            AS          203.25

If you were flying from Salt Lake City on Sunday with Alaska Airlines in July 2015, consider yourself unlucky (we have only done simple analysis so far, so one shouldn't attach too much significance to this result). There may be multiple other random factors contributing to the delay.

Even though we ran the example in SparkR, we still used the R data.frame. If we want to analyze data across multiple months, we will need to distribute the load across multiple nodes. This is where the SparkR distributed DataFrame comes into play, as it can be distributed across multiple threads even on a single node. There is a direct way to convert the R DataFrame to SparkR DataFrame (and thus to RDD):

> sparkDf <- createDataFrame(sqlContext, flights)

If I run it on a laptop, I will run out of memory. The overhead is large due to the fact that I need to transfer the data between multiple threads/nodes, we want to filter it as soon as possible:

sparkDf <- createDataFrame(sqlContext, subset(flights, select = c("ArrDelayMinutes", "DayOfWeek", "Origin", "Dest", "UniqueCarrier")))

This will run even on my laptop. There is, of course, a reverse conversion from Spark's DataFrame to R's data.frame:

> rDf <- as.data.frame(sparkDf)

Alternatively, I can use the spark-csv package to read it from the .csv file, which, if the original .csv file is in a distributed filesystem such as HDFS, will avoid shuffling the data over network in a cluster setting. The only drawback, at least currently, is that Spark cannot read from the .zip files directly:

> $ ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.3.0 --master local[8]

R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Warning: namespace 'SparkR' is not available and has been replaced
by .GlobalEnv when processing object 'sparkDf'
[Previously saved workspace restored]

Launching java with spark-submit command /home/alex/spark-1.6.1-bin-hadoop2.6/bin/spark-submit   "--master" "local[8]" "--packages" "com.databricks:spark-csv_2.10:1.3.0" "sparkr-shell" /tmp/RtmpfhcUXX/backend_port1b066bea5a03 
Ivy Default Cache set to: /home/alex/.ivy2/cache
The jars for the packages stored in: /home/alex/.ivy2/jars
:: loading settings :: url = jar:file:/home/alex/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-csv_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
  confs: [default]
  found com.databricks#spark-csv_2.10;1.3.0 in central
  found org.apache.commons#commons-csv;1.1 in central
  found com.univocity#univocity-parsers;1.5.1 in central
:: resolution report :: resolve 189ms :: artifacts dl 4ms
  :: modules in use:
  com.databricks#spark-csv_2.10;1.3.0 from central in [default]
  com.univocity#univocity-parsers;1.5.1 from central in [default]
  org.apache.commons#commons-csv;1.1 from central in [default]
  ---------------------------------------------------------------------
  |                  |            modules            ||   artifacts   |
  |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
  ---------------------------------------------------------------------
  |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
  ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
  confs: [default]
  0 artifacts copied, 3 already retrieved (0kB/7ms)

 Welcome to
    ____              __ 
   / __/__  ___ _____/ /__ 
  _ / _ / _ `/ __/  '_/ 
 /___/ .__/\_,_/_/ /_/\_   version  1.6.1 
    /_/ 


 Spark context is available as sc, SQL context is available as sqlContext
> sparkDf <- read.df(sqlContext, "./flights", "com.databricks.spark.csv", header="true", inferSchema = "false")
> sfoFlights <- select(filter(sparkDf, sparkDf$Dest == "SFO"), "DayOfWeek", "Origin", "UniqueCarrier", "ArrDelayMinutes")
> aggs <- agg(group_by(sfoFlights, "DayOfWeek", "Origin", "UniqueCarrier"), count(sparkDf$ArrDelayMinutes), avg(sparkDf$ArrDelayMinutes))
> head(arrange(aggs, c('avg(ArrDelayMinutes)'), decreasing = TRUE), 10)
   DayOfWeek Origin UniqueCarrier count(ArrDelayMinutes) avg(ArrDelayMinutes)   
1          7    SLC            AS                      4               203.25
2          6    TUS            OO                      4               168.25
3          3    RNO            UA                      8                79.50
4          5    IAH            F9                      5                77.60
5          4    TUS            OO                      5                71.80
6          4    ABQ            OO                      5                67.60
7          2    ABQ            OO                      4                66.25
8          1    IAH            F9                      4                61.25
9          4    DAL            WN                      5                59.20
10         3    SUN            OO                      5                59.00

Note that we loaded the additional com.databricks:spark-csv_2.10:1.3.0 package by supplying the --package flag on the command line; we can easily go distributed by using a Spark instance over a cluster of nodes or even analyze a larger dataset:

$ for i in $(seq 1 6); do wget http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_2015_$i.zip; unzip -d flights On_Time_On_Time_Performance_2015_$i.zip; hadoop fs -put -f flights/On_Time_On_Time_Performance_2015_$i.csv flights; done

$ hadoop fs -ls flights
Found 7 items
-rw-r--r--   3 alex eng  211633432 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_1.csv
-rw-r--r--   3 alex eng  192791767 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_2.csv
-rw-r--r--   3 alex eng  227016932 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_3.csv
-rw-r--r--   3 alex eng  218600030 2016-02-16 03:28 flights/On_Time_On_Time_Performance_2015_4.csv
-rw-r--r--   3 alex eng  224003544 2016-02-16 03:29 flights/On_Time_On_Time_Performance_2015_5.csv
-rw-r--r--   3 alex eng  227418780 2016-02-16 03:29 flights/On_Time_On_Time_Performance_2015_6.csv
-rw-r--r--   3 alex eng  235037955 2016-02-15 21:56 flights/On_Time_On_Time_Performance_2015_7.csv

This will download and put the on-time performance data in the flight's directory (remember, as we discussed in Chapter 1, Exploratory Data Analysis, we would like to treat directories as big data datasets). We can now run the same analysis over the whole period of 2015 (for the available data):

> sparkDf <- read.df(sqlContext, "./flights", "com.databricks.spark.csv", header="true")
> sfoFlights <- select(filter(sparkDf, sparkDf$Dest == "SFO"), "DayOfWeek", "Origin", "UniqueCarrier", "ArrDelayMinutes")
> aggs <- cache(agg(group_by(sfoFlights, "DayOfWeek", "Origin", "UniqueCarrier"), count(sparkDf$ArrDelayMinutes), avg(sparkDf$ArrDelayMinutes)))
> head(arrange(aggs, c('avg(ArrDelayMinutes)'), decreasing = TRUE), 10)
   DayOfWeek Origin UniqueCarrier count(ArrDelayMinutes) avg(ArrDelayMinutes)   
1          6    MSP            UA                      1            122.00000
2          3    RNO            UA                      8             79.50000
3          1    MSP            UA                     13             68.53846
4          7    SAT            UA                      1             65.00000
5          7    STL            UA                      9             64.55556
6          1    ORD            F9                     13             55.92308
7          1    MSO            OO                      4             50.00000
8          2    MSO            OO                      4             48.50000
9          5    CEC            OO                     28             45.86957
10         3    STL            UA                     13             43.46154

Note that we used a cache() call to pin the dataset to the memory as we will use it again later. This time it's Minneapolis/United on Saturday! However, you probably already know why: there is only one record for this combination of DayOfWeek, Origin, and UniqueCarrier; it's most likely an outlier. The average over about 30 flights for the previous outlier was reduced to 30 minutes:

> head(arrange(filter(filter(aggs, aggs$Origin == "SLC"), aggs$UniqueCarrier == "AS"), c('avg(ArrDelayMinutes)'), decreasing = TRUE), 100)
  DayOfWeek Origin UniqueCarrier count(ArrDelayMinutes) avg(ArrDelayMinutes)
1         7    SLC            AS                     30            32.600000
2         2    SLC            AS                     30            10.200000
3         4    SLC            AS                     31             9.774194
4         1    SLC            AS                     30             9.433333
5         3    SLC            AS                     30             5.866667
6         5    SLC            AS                     31             5.516129
7         6    SLC            AS                     30             2.133333

Sunday still remains a problem in terms of delays. The limit to the amount of data we can analyze now is only the number of cores on the laptop and nodes in the cluster. Let's look at more complex machine learning models now.

Linear models

Linear methods play an important role in statistical modeling. As the name suggests, linear model assumes that the dependent variable is a weighted combination of independent variables. In R, the lm function performs a linear regression and reports the coefficients, as follows:

R> attach(iris)
R> lm(Sepal.Length ~ Sepal.Width)

Call:
lm(formula = Sepal.Length ~ Sepal.Width)

Coefficients:
(Intercept)  Sepal.Width
     6.5262      -0.2234

The summary function provides even more information:

R> model <- lm(Sepal.Length ~ Sepal.Width + Petal.Length + Petal.Width)
R> summary(model)

Call:
lm(formula = Sepal.Length ~ Sepal.Width + Petal.Length + Petal.Width)

Residuals:
     Min       1Q   Median       3Q      Max 
-0.82816 -0.21989  0.01875  0.19709  0.84570 

Coefficients:
             Estimate Std. Error t value Pr(>|t|)    
(Intercept)   1.85600    0.25078   7.401 9.85e-12 ***
Sepal.Width   0.65084    0.06665   9.765  < 2e-16 ***
Petal.Length  0.70913    0.05672  12.502  < 2e-16 ***
Petal.Width  -0.55648    0.12755  -4.363 2.41e-05 ***
---
Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

Residual standard error: 0.3145 on 146 degrees of freedom
Multiple R-squared:  0.8586,  Adjusted R-squared:  0.8557 
F-statistic: 295.5 on 3 and 146 DF,  p-value: < 2.2e-16

While we considered generalized linear models in Chapter 3, Working with Spark and MLlib, and we will also consider the glm implementation in R and SparkR shortly, linear models provide more information in general and are an excellent tool for working with noisy data and selecting the relevant attribute for further analysis.

Note

Data analysis life cycle

While most of the statistical books focus on the analysis and best use of available data, the results of statistical analysis in general should also affect the search for the new sources of information. In the complete data life cycle, discussed at the end of Chapter 3, Working with Spark and MLlib, a data scientist should always transform the latest variable importance results into the theories of how to collect data. For example, if the ink usage analysis for home printers points to an increase in ink usage for photos, one could potentially collect more information about the format of the pictures, sources of digital images, and paper the user prefers to use. This approach turned out to be very productive in a real business situation even though not fully automated.

Specifically, here is a short description of the output that linear models provide:

  • Residuals: These are statistics for the difference between the actual and predicted values. A lot of techniques exist to detect the problems with the models on patterns of the residual distribution, but this is out of scope of this book. A detailed residual table can be obtained with the resid(model) function.
  • Coefficients: These are the actual linear combination coefficients; the t-value represents the ratio of the value of the coefficient to the estimate of the standard error: higher values mean a higher likelihood that this coefficient has a non-trivial effect on the dependent variable. The coefficients can also be obtained with coef(model) functions.
  • Residual standard error: This reports the standard mean square error, the metric that is the target of optimization in a straightforward linear regression.
  • Multiple R-squared: This is the fraction of the dependent variable variance that is explained by the model. The adjusted value accounts for the number of parameters in your model and is considered to be a better metric to avoid overfitting if the number of observations does not justify the complexity of the models, which happens even for big data problems.
  • F-statistic: The measure of model quality. In plain terms, it measures how all the parameters in the model explain the dependent variable. The p-value provides the probability that the model explains the dependent variable just due to random chance. The values under 0.05 (or 5%) are, in general, considered satisfactory. While in general, a high value probably means that the model is probably not statistically valid and "nothing else matters, the low F-statistic does not always mean that the model will work well in practice, so it cannot be directly applied as a model acceptance criterion.

Once the linear models are applied, usually more complex glm or recursive models, such as decision trees and the rpart function, are applied to find interesting variable interactions. Linear models are good for establishing baseline on the other models that can improve.

Finally, ANOVA is a standard technique to study the variance if the independent variables are discrete:

R> aov <- aov(Sepal.Length ~ Species)
R> summary(aov)
             Df Sum Sq Mean Sq F value Pr(>F)    
Species       2  63.21  31.606   119.3 <2e-16 ***
Residuals   147  38.96   0.265                   
---
Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

The measure of the model quality is F-statistics. While one can always run R algorithms with RDD using the pipe mechanism with Rscript, I will partially cover this functionality with respect to Java Specification Request (JSR) 223 Python integration later. In this section, I would like to explore specifically a generalized linear regression glm function that is implemented both in R and SparkR natively.

Generalized linear model

Once again, you can run either R glm or SparkR glm. The list of possible link and optimization functions for R implementation is provided in the following table:

The following list shows possible options for R glm implementation:

Family

Variance

Link

gaussian

gaussian

identity

binomial

binomial

logit, probit or cloglog

poisson

poisson

log, identity or sqrt

Gamma

Gamma

inverse, identity or log

inverse.gaussian

inverse.gaussian

1/mu^2

quasi

user-defined

user-defined

I will use a binary target, ArrDel15, which indicates whether the plane was more than 15 minutes late for the arrival. The independent variables will be DepDel15, DayOfWeek, Month, UniqueCarrier, Origin, and Dest:

R> flights <- read.table(unz("On_Time_On_Time_Performance_2015_7.zip", "On_Time_On_Time_Performance_2015_7.csv"), nrows=1000000, header=T, quote=""", sep=",")
R> flights$DoW_ <- factor(flights$DayOfWeek,levels=c(1,2,3,4,5,6,7), labels=c("Mon","Tue","Wed","Thu","Fri","Sat","Sun"))
R> attach(flights)
R> system.time(model <- glm(ArrDel15 ~ UniqueCarrier + DoW_ + Origin + Dest, flights, family="binomial"))

While you wait for the results, open another shell and run glm in the SparkR mode on the full seven months of data:

sparkR> cache(sparkDf <- read.df(sqlContext, "./flights", "com.databricks.spark.csv", header="true", inferSchema="true"))
DataFrame[Year:int, Quarter:int, Month:int, DayofMonth:int, DayOfWeek:int, FlightDate:string, UniqueCarrier:string, AirlineID:int, Carrier:string, TailNum:string, FlightNum:int, OriginAirportID:int, OriginAirportSeqID:int, OriginCityMarketID:int, Origin:string, OriginCityName:string, OriginState:string, OriginStateFips:int, OriginStateName:string, OriginWac:int, DestAirportID:int, DestAirportSeqID:int, DestCityMarketID:int, Dest:string, DestCityName:string, DestState:string, DestStateFips:int, DestStateName:string, DestWac:int, CRSDepTime:int, DepTime:int, DepDelay:double, DepDelayMinutes:double, DepDel15:double, DepartureDelayGroups:int, DepTimeBlk:string, TaxiOut:double, WheelsOff:int, WheelsOn:int, TaxiIn:double, CRSArrTime:int, ArrTime:int, ArrDelay:double, ArrDelayMinutes:double, ArrDel15:double, ArrivalDelayGroups:int, ArrTimeBlk:string, Cancelled:double, CancellationCode:string, Diverted:double, CRSElapsedTime:double, ActualElapsedTime:double, AirTime:double, Flights:double, Distance:double, DistanceGroup:int, CarrierDelay:double, WeatherDelay:double, NASDelay:double, SecurityDelay:double, LateAircraftDelay:double, FirstDepTime:int, TotalAddGTime:double, LongestAddGTime:double, DivAirportLandings:int, DivReachedDest:double, DivActualElapsedTime:double, DivArrDelay:double, DivDistance:double, Div1Airport:string, Div1AirportID:int, Div1AirportSeqID:int, Div1WheelsOn:int, Div1TotalGTime:double, Div1LongestGTime:double, Div1WheelsOff:int, Div1TailNum:string, Div2Airport:string, Div2AirportID:int, Div2AirportSeqID:int, Div2WheelsOn:int, Div2TotalGTime:double, Div2LongestGTime:double, Div2WheelsOff:string, Div2TailNum:string, Div3Airport:string, Div3AirportID:string, Div3AirportSeqID:string, Div3WheelsOn:string, Div3TotalGTime:string, Div3LongestGTime:string, Div3WheelsOff:string, Div3TailNum:string, Div4Airport:string, Div4AirportID:string, Div4AirportSeqID:string, Div4WheelsOn:string, Div4TotalGTime:string, Div4LongestGTime:string, Div4WheelsOff:string, Div4TailNum:string, Div5Airport:string, Div5AirportID:string, Div5AirportSeqID:string, Div5WheelsOn:string, Div5TotalGTime:string, Div5LongestGTime:string, Div5WheelsOff:string, Div5TailNum:string, :string]
sparkR> noNulls <- cache(dropna(selectExpr(filter(sparkDf, sparkDf$Cancelled == 0), "ArrDel15", "UniqueCarrier", "format_string('%d', DayOfWeek) as DayOfWeek", "Origin", "Dest"), "any"))
sparkR> sparkModel = glm(ArrDel15 ~ UniqueCarrier + DayOfWeek + Origin + Dest, noNulls, family="binomial")

Here we try to build a model explaining delays as an effect of carrier, day of week, and origin on destination airports, which is captured by the formular construct ArrDel15 ~ UniqueCarrier + DayOfWeek + Origin + Dest.

Note

Nulls, big data, and Scala

Note that in the SparkR case of glm, I had to explicitly filter out the non-cancelled flights and removed the NA—or nulls in the C/Java lingo. While R does this for you by default, NAs in big data are very common as the datasets are typically sparse and shouldn't be treated lightly. The fact that we have to deal with nulls explicitly in MLlib warns us about some additional information in the dataset and is definitely a welcome feature. The presence of an NA can carry information about the way the data was collected. Ideally, each NA should be accompanied by a small get_na_info method as to why this particular value was not available or collected, which leads us to the Either type in Scala.

Even though nulls are inherited from Java and a part of Scala, the Option and Either types are new and more robust mechanism to deal with special cases where nulls were traditionally used. Specifically, Either can provide a value or exception message as to why it was not computed; while Option can either provide a value or be None, which can be readily captured by the Scala pattern-matching framework.

One thing you will notice is that SparkR will run multiple threads, and even on a single node, it will consume CPU time from multiple cores and returns much faster even with a larger size of data. In my experiment on a 32-core machine, it was able to finish in under a minute (as opposed to 35 minutes for R glm). To get the results, as in the R model case, we need to run the summary() method:

> summary(sparkModel)
$coefficients
                     Estimate
(Intercept)      -1.518542340
UniqueCarrier_WN  0.382722232
UniqueCarrier_DL -0.047997652
UniqueCarrier_OO  0.367031995
UniqueCarrier_AA  0.046737727
UniqueCarrier_EV  0.344539788
UniqueCarrier_UA  0.299290120
UniqueCarrier_US  0.069837542
UniqueCarrier_MQ  0.467597761
UniqueCarrier_B6  0.326240578
UniqueCarrier_AS -0.210762769
UniqueCarrier_NK  0.841185903
UniqueCarrier_F9  0.788720078
UniqueCarrier_HA -0.094638586
DayOfWeek_5       0.232234937
DayOfWeek_4       0.274016179
DayOfWeek_3       0.147645473
DayOfWeek_1       0.347349366
DayOfWeek_2       0.190157420
DayOfWeek_7       0.199774806
Origin_ATL       -0.180512251
...

The worst performer is NK (Spirit Airlines). Internally, SparkR uses limited-memory BFGS, which is a limited-memory quasi-Newton optimization method that is similar to the results obtained with R glm on the July data:

R> summary(model)

Call:
glm(formula = ArrDel15 ~ UniqueCarrier + DoW + Origin + Dest, 
    family = "binomial", data = dow)

Deviance Residuals: 
    Min       1Q   Median       3Q      Max  
-1.4205  -0.7274  -0.6132  -0.4510   2.9414  

Coefficients:
                  Estimate Std. Error z value Pr(>|z|)    
(Intercept)     -1.817e+00  2.402e-01  -7.563 3.95e-14 ***
UniqueCarrierAS -3.296e-01  3.413e-02  -9.658  < 2e-16 ***
UniqueCarrierB6  3.932e-01  2.358e-02  16.676  < 2e-16 ***
UniqueCarrierDL -6.602e-02  1.850e-02  -3.568 0.000359 ***
UniqueCarrierEV  3.174e-01  2.155e-02  14.728  < 2e-16 ***
UniqueCarrierF9  6.754e-01  2.979e-02  22.668  < 2e-16 ***
UniqueCarrierHA  7.883e-02  7.058e-02   1.117 0.264066    
UniqueCarrierMQ  2.175e-01  2.393e-02   9.090  < 2e-16 ***
UniqueCarrierNK  7.928e-01  2.702e-02  29.343  < 2e-16 ***
UniqueCarrierOO  4.001e-01  2.019e-02  19.817  < 2e-16 ***
UniqueCarrierUA  3.982e-01  1.827e-02  21.795  < 2e-16 ***
UniqueCarrierVX  9.723e-02  3.690e-02   2.635 0.008423 ** 
UniqueCarrierWN  6.358e-01  1.700e-02  37.406  < 2e-16 ***
dowTue           1.365e-01  1.313e-02  10.395  < 2e-16 ***
dowWed           1.724e-01  1.242e-02  13.877  < 2e-16 ***
dowThu           4.593e-02  1.256e-02   3.656 0.000256 ***
dowFri          -2.338e-01  1.311e-02 -17.837  < 2e-16 ***
dowSat          -2.413e-01  1.458e-02 -16.556  < 2e-16 ***
dowSun          -3.028e-01  1.408e-02 -21.511  < 2e-16 ***
OriginABI       -3.355e-01  2.554e-01  -1.314 0.188965    
...

Other parameters of SparkR glm implementation are provided in the following table:

The following table shows a list of parameters for SparkR glm implementation:

Parameter

Possible Values

Comments

formula

A symbolic description like in R

Currently only a subset of formula operators are supported: '~', '.', ':', '+', and '-'

family

gaussian or binomial

Needs to be in quotes: gaussian -> linear regression, binomial -> logistic regression

data

DataFrame

Needs to be SparkR DataFrame, not data.frame

lambda

positive

Regularization coefficient

alpha

positive

Elastic-net mixing parameter (refer to glmnet's documentation for details)

standardize

TRUE or FALSE

User-defined

solver

l-bfgs, normal or auto

auto will choose the algorithm automatically, l-bfgs means limited-memory BFGS, normal means using normal equation as an analytical solution to the linear regression problem

Reading JSON files in SparkR

Schema on Read is one of the convenient features of big data. The DataFrame class has the ability to figure out the schema of a text file containing a JSON record per line:

[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ cat examples/src/main/resources/people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ bin/sparkR
...

> people = read.json(sqlContext, "examples/src/main/resources/people.json")
> dtypes(people)
[[1]]
[1] "age"    "bigint"

[[2]]
[1] "name"   "string"

> schema(people)
StructType
|-name = "age", type = "LongType", nullable = TRUE
|-name = "name", type = "StringType", nullable = TRUE
> showDF(people)
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Writing Parquet files in SparkR

As we mentioned in the previous chapter, the Parquet format is an efficient storage format, particularly for low cardinality columns. Parquet files can be read/written directly from R:

> write.parquet(sparkDf, "parquet")

You can see that the new Parquet file is 66 times smaller that the original zip file downloaded from the DoT:

[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ ls –l On_Time_On_Time_Performance_2015_7.zip parquet/ flights/
-rw-r--r--  1 akozlov  staff  26204213 Sep  9 12:21 /Users/akozlov/spark/On_Time_On_Time_Performance_2015_7.zip

flights/:
total 459088
-rw-r--r--  1 akozlov  staff  235037955 Sep  9 12:20 On_Time_On_Time_Performance_2015_7.csv
-rw-r--r--  1 akozlov  staff      12054 Sep  9 12:20 readme.html

parquet/:
total 848
-rw-r--r--  1 akozlov  staff       0 Jan 24 22:50 _SUCCESS
-rw-r--r--  1 akozlov  staff   10000 Jan 24 22:50 _common_metadata
-rw-r--r--  1 akozlov  staff   23498 Jan 24 22:50 _metadata
-rw-r--r--  1 akozlov  staff  394418 Jan 24 22:50 part-r-00000-9e2d0004-c71f-4bf5-aafe-90822f9d7223.gz.parquet

Invoking Scala from R

Let's assume that one has an exceptional implementation of a numeric method in Scala that we want to call from R. One way of doing this would be to use the R system() function that invokes /bin/sh on Unix-like systems. However, the rscala package is a more efficient way that starts a Scala interpreter and maintains communication over TCP/IP network connection.

Here, the Scala interpreter maintains the state (memoization) between the calls. Similarly, one can define functions, as follows:

R> scala <- scalaInterpreter()
R> scala %~% 'def pri(i: Stream[Int]): Stream[Int] = i.head #:: pri(i.tail filter  { x => { println("Evaluating " + x + "%" + i.head); x % i.head != 0 } } )'
ScalaInterpreterReference... engine: javax.script.ScriptEngine
R> scala %~% 'val primes = pri(Stream.from(2))'
ScalaInterpreterReference... primes: Stream[Int]
R> scala %~% 'primes take 5 foreach println'
2
Evaluating 3%2
3
Evaluating 4%2
Evaluating 5%2
Evaluating 5%3
5
Evaluating 6%2
Evaluating 7%2
Evaluating 7%3
Evaluating 7%5
7
Evaluating 8%2
Evaluating 9%2
Evaluating 9%3
Evaluating 10%2
Evaluating 11%2
Evaluating 11%3
Evaluating 11%5
Evaluating 11%7
11
R> scala %~% 'primes take 5 foreach println'
2
3
5
7
11
R> scala %~% 'primes take 7 foreach println'
2
3
5
7
11
Evaluating 12%2
Evaluating 13%2
Evaluating 13%3
Evaluating 13%5
Evaluating 13%7
Evaluating 13%11
13
Evaluating 14%2
Evaluating 15%2
Evaluating 15%3
Evaluating 16%2
Evaluating 17%2
Evaluating 17%3
Evaluating 17%5
Evaluating 17%7
Evaluating 17%11
Evaluating 17%13
17
R> 

R from Scala can be invoked using the ! or !! Scala operators and Rscript command:

[akozlov@Alexanders-MacBook-Pro ~]$ cat << EOF > rdate.R
> #!/usr/local/bin/Rscript
> 
> write(date(), stdout())
> EOF
[akozlov@Alexanders-MacBook-Pro ~]$ chmod a+x rdate.R
[akozlov@Alexanders-MacBook-Pro ~]$ scala
Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import sys.process._
import sys.process._

scala> val date = Process(Seq("./rdate.R")).!!
date: String =
"Wed Feb 24 02:20:09 2016
"

Using Rserve

A more efficient way is to use the similar TCP/IP binary transport protocol to communicate with R with Rsclient/Rserve (http://www.rforge.net/Rserve). To start Rserve on a node that has R installed, perform the following action:

[akozlov@Alexanders-MacBook-Pro ~]$ wget http://www.rforge.net/Rserve/snapshot/Rserve_1.8-5.tar.gz

[akozlov@Alexanders-MacBook-Pro ~]$ R CMD INSTALL Rserve_1.8-5.tar
.gz
...
[akozlov@Alexanders-MacBook-Pro ~]$ R CMD INSTALL Rserve_1.8-5.tar.gz

[akozlov@Alexanders-MacBook-Pro ~]$ $ R -q CMD Rserve

R version 3.2.3 (2015-12-10) -- "Wooden Christmas-Tree"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-apple-darwin13.4.0 (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Rserv started in daemon mode.

By default, Rserv opens a connection on localhost:6311. The advantage of the binary network protocol is that it is platform-independent and multiple clients can communicate with the server. The clients can connect to Rserve.

Note that, while passing the results as a binary object has its advantages, you have to be careful with the type mappings between R and Scala. Rserve supports other clients, including Python, but I will also cover JSR 223-compliant scripting at the end of this chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset