Integrating with Python

Python has slowly established ground as a de-facto tool for data science. It has a command-line interface and decent visualization via matplotlib and ggplot, which is based on R's ggplot2. Recently, Wes McKinney, the creator of Pandas, the time series data-analysis package, has joined Cloudera to pave way for Python in big data.

Setting up Python

Python is usually part of the default installation. Spark requires version 2.7.0+.

If you don't have Python on Mac OS, I recommend installing the Homebrew package manager from http://brew.sh:

[akozlov@Alexanders-MacBook-Pro spark(master)]$ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
==> This script will install:
/usr/local/bin/brew
/usr/local/Library/...
/usr/local/share/man/man1/brew.1

[akozlov@Alexanders-MacBook-Pro spark(master)]$ brew install python

Otherwise, on a Unix-like system, Python can be compiled from the source distribution:

$ export PYTHON_VERSION=2.7.11
$ wget -O - https://www.python.org/ftp/python/$PYTHON_VERSION/Python-$PYTHON_VERSION.tgz | tar xzvf -
$ cd $HOME/Python-$PYTHON_VERSION
$ ./configure--prefix=/usr/local --enable-unicode=ucs4--enable-shared LDFLAGS="-Wl,-rpath /usr/local/lib"
$ make; sudo make altinstall
$ sudo ln -sf /usr/local/bin/python2.7 /usr/local/bin/python

It is good practice to place it in a directory different from the default Python installation. It is normal to have multiple versions of Python on a single system, which usually does not lead to problems as Python separates the installation directories. For the purpose of this chapter, as for many machine learning takes, I'll also need a few packages. The packages and specific versions may differ across installations:

$ wget https://bootstrap.pypa.io/ez_setup.py
$ sudo /usr/local/bin/python ez_setup.py
$ sudo /usr/local/bin/easy_install-2.7 pip
$ sudo /usr/local/bin/pip install --upgrade avro nose numpy scipy pandas statsmodels scikit-learn iso8601 python-dateutil python-snappy

If everything compiles—SciPy uses a Fortran compiler and libraries for linear algebra—we are ready to use Python 2.7.11!

Note

Note that if one wants to use Python with the pipe command in a distributed environment, Python needs to be installed on every node in the network.

PySpark

As bin/sparkR launches R with preloaded Spark context, bin/pyspark launches Python shell with preloaded Spark context and Spark driver running. The PYSPARK_PYTHON environment variable can be used to point to a specific Python version:

[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ export PYSPARK_PYTHON=/usr/local/bin/python
[akozlov@Alexanders-MacBook-Pro spark-1.6.1-bin-hadoop2.6]$ bin/pyspark 
Python 2.7.11 (default, Jan 23 2016, 20:14:24) 
[GCC 4.2.1 Compatible Apple LLVM 7.0.2 (clang-700.1.81)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_   version 1.6.1
      /_/

Using Python version 2.7.11 (default, Jan 23 2016 20:14:24)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

PySpark directly supports most of MLlib functionality on Spark RDDs (http://spark.apache.org/docs/latest/api/python), but it is known to lag a few releases behind the Scala API (http://spark.apache.org/docs/latest/api/python). As of the 1.6.0+ release, it also supports DataFrames (http://spark.apache.org/docs/latest/sql-programming-guide.html):

>>> sfoFlights = sqlContext.sql("SELECT Dest, UniqueCarrier, ArrDelayMinutes FROM parquet.parquet")
>>> sfoFlights.groupBy(["Dest", "UniqueCarrier"]).agg(func.avg("ArrDelayMinutes"), func.count("ArrDelayMinutes")).sort("avg(ArrDelayMinutes)", ascending=False).head(5)
[Row(Dest=u'HNL', UniqueCarrier=u'HA', avg(ArrDelayMinutes)=53.70967741935484, count(ArrDelayMinutes)=31), Row(Dest=u'IAH', UniqueCarrier=u'F9', avg(ArrDelayMinutes)=43.064516129032256, count(ArrDelayMinutes)=31), Row(Dest=u'LAX', UniqueCarrier=u'DL', avg(ArrDelayMinutes)=39.68691588785047, count(ArrDelayMinutes)=214), Row(Dest=u'LAX', UniqueCarrier=u'WN', avg(ArrDelayMinutes)=29.704453441295545, count(ArrDelayMinutes)=247), Row(Dest=u'MSO', UniqueCarrier=u'OO', avg(ArrDelayMinutes)=29.551724137931036, count(ArrDelayMinutes)=29)]

Calling Python from Java/Scala

As this is really a book about Scala, we should also mention that one can call Python code and its interpreter directly from Scala (or Java). There are a few options available that will be discussed in this chapter.

Using sys.process._

Scala, as well as Java, can call OS processes via spawning a separate thread, which we already used for interactive analysis in Chapter 1, Exploratory Data Analysis: the .! method will start the process and return the exit code, while .!! will return the string that contains the output:

scala> import sys.process._
import sys.process._

scala> val retCode = Process(Seq("/usr/local/bin/python", "-c", "import socket; print(socket.gethostname())")).!
Alexanders-MacBook-Pro.local
retCode: Int = 0

scala> val lines = Process(Seq("/usr/local/bin/python", "-c", """from datetime import datetime, timedelta; print("Yesterday was {}".format(datetime.now()-timedelta(days=1)))""")).!!
lines: String =
"Yesterday was 2016-02-12 16:24:53.161853
"

Let's try a more complex SVD computation (similar to the one we used in SVD++ recommendation engine, but this time, it invokes BLAS C-libraries at the backend). I created a Python executable that takes a string representing a matrix and the required rank as an input and outputs an SVD approximation with the provided rank:

#!/usr/bin/env python

import sys
import os
import re

import numpy as np
from scipy import linalg
from scipy.linalg import svd

np.set_printoptions(linewidth=10000)

def process_line(input):
    inp = input.rstrip("
")
    if len(inp) > 1:
        try:
            (mat, rank) = inp.split("|")
            a = np.matrix(mat)
            r = int(rank)
        except:
            a = np.matrix(inp)
            r = 1
        U, s, Vh = linalg.svd(a, full_matrices=False)
        for i in xrange(r, s.size):
            s[i] = 0
        S = linalg.diagsvd(s, s.size, s.size)
        print(str(np.dot(U, np.dot(S, Vh))).replace(os.linesep, ";"))

if __name__ == '__main__':
    map(process_line, sys.stdin)

Let's call it svd.py and put in in the current directory. Given a matrix and rank as an input, it produces an approximation of a given rank:

$ echo -e "1,2,3;2,1,2;3,2,1;7,8,9|3" | ./svd.py
[[ 1.  2.  3.]; [ 2.  1.  2.]; [ 3.  2.  1.]; [ 7.  8.  9.]]

To call it from Scala, let's define the following #<<< method in our DSL:

scala> implicit class RunCommand(command: String) {
     |   def #<<< (input: String)(implicit buffer: StringBuilder) =  {
     |     val process = Process(command)
     |     val io = new ProcessIO (
     |       in  => { in.write(input getBytes "UTF-8"); in.close},
     |       out => { buffer append scala.io.Source.fromInputStream(out).getLines.mkString("
"); buffer.append("
"); out.close() },
     |       err => { scala.io.Source.fromInputStream(err).getLines().foreach(System.err.println) })
     |     (process run io).exitValue
     |   }
     | }
defined class RunCommand

Now, we can use the #<<< operator to call Python's SVD method:

scala> implicit val buffer = new StringBuilder()
buffer: StringBuilder =

scala> if ("./svd.py" #<<< "1,2,3;2,1,2;3,2,1;7,8,9|1" == 0)  Some(buffer.toString) else None
res77: Option[String] = Some([[ 1.84716691  2.02576751  2.29557674]; [ 1.48971176  1.63375041  1.85134741]; [ 1.71759947  1.88367234  2.13455611]; [ 7.19431647  7.88992728  8.94077601]])

Note that as we requested the resulting matrix rank to be one, all rows and columns are linearly dependent. We can even pass several lines of input at a time, as follows:

scala> if ("./svd.py" #<<< """
     | 1,2,3;2,1,2;3,2,1;7,8,9|0
     | 1,2,3;2,1,2;3,2,1;7,8,9|1
     | 1,2,3;2,1,2;3,2,1;7,8,9|2
     | 1,2,3;2,1,2;3,2,1;7,8,9|3""" == 0) Some(buffer.toString) else None
res80: Option[String] =
Some([[ 0.  0.  0.]; [ 0.  0.  0.]; [ 0.  0.  0.]; [ 0.  0.  0.]]
[[ 1.84716691  2.02576751  2.29557674]; [ 1.48971176  1.63375041  1.85134741]; [ 1.71759947  1.88367234  2.13455611]; [ 7.19431647  7.88992728  8.94077601]]
[[ 0.9905897   2.02161614  2.98849663]; [ 1.72361156  1.63488399  1.66213642]; [ 3.04783513  1.89011928  1.05847477]; [ 7.04822694  7.88921926  9.05895373]]
[[ 1.  2.  3.]; [ 2.  1.  2.]; [ 3.  2.  1.]; [ 7.  8.  9.]])

Spark pipe

SVD decomposition is usually a pretty heavy operation, so the relative overhead of calling Python in this case is small. We can avoid this overhead if we keep the process running and supply several lines at a time, like we did in the last example. Both Hadoop MR and Spark implement this approach. For example, in Spark, the whole computation will take only one line, as shown in the following:

scala> sc.parallelize(List("1,2,3;2,1,2;3,2,1;7,8,9|0", "1,2,3;2,1,2;3,2,1;7,8,9|1", "1,2,3;2,1,2;3,2,1;7,8,9|2", "1,2,3;2,1,2;3,2,1;7,8,9|3"),4).pipe("./svd.py").collect.foreach(println)
[[ 0.  0.  0.]; [ 0.  0.  0.]; [ 0.  0.  0.]; [ 0.  0.  0.]]
[[ 1.84716691  2.02576751  2.29557674]; [ 1.48971176  1.63375041  1.85134741]; [ 1.71759947  1.88367234  2.13455611]; [ 7.19431647  7.88992728  8.94077601]]
[[ 0.9905897   2.02161614  2.98849663]; [ 1.72361156  1.63488399  1.66213642]; [ 3.04783513  1.89011928  1.05847477]; [ 7.04822694  7.88921926  9.05895373]]
[[ 1.  2.  3.]; [ 2.  1.  2.]; [ 3.  2.  1.]; [ 7.  8.  9.]]

The whole pipeline is ready to be distributed across a cluster of multicore workstations! I think you will be in love with Scala/Spark already.

Note that debugging the pipelined executions might be tricky as the data is passed from one process to another using OS pipes.

Jython and JSR 223

For completeness, we need to mention Jython, a Java implementation of Python (as opposed to a more familiar C implementation, also called CPython). Jython avoids the problem of passing input/output via OS pipelines by allowing the users to compile Python source code to Java byte codes, and running the resulting bytecodes on any Java virtual machine. As Scala also runs in Java virtual machine, it can use the Jython classes directly, although the reverse is not true in general; Scala classes sometimes are not compatible to be used by Java/Jython.

Note

JSR 223

In this particular case, the request is for "Scripting for the JavaTM Platform" and was originally filed on Nov 15th 2004 (https://www.jcp.org/en/jsr/detail?id=223). At the beginning, it was targeted towards the ability of the Java servlet to work with multiple scripting languages. The specification requires the scripting language maintainers to provide a Java JAR with corresponding implementations. Portability issues hindered practical implementations, particularly when platforms require complex interaction with OS, such as dynamic linking in C or Fortran. Currently, only a handful languages are supported, with R and Python being supported, but in incomplete form.

Since Java 6, JSR 223: Scripting for Java added the javax.script package that allows multiple scripting languages to be called through the same API as long as the language provides a script engine. To add the Jython scripting language, download the latest Jython JAR from the Jython site at http://www.jython.org/downloads.html:

$ wget -O jython-standalone-2.7.0.jar http://search.maven.org/remotecontent?filepath=org/python/jython-standalone/2.7.0/jython-standalone-2.7.0.jar

[akozlov@Alexanders-MacBook-Pro Scala]$ scala -cp jython-standalone-2.7.0.jar 
Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import javax.script.ScriptEngine;
...
scala> import javax.script.ScriptEngineManager;
...
scala> import javax.script.ScriptException;
...
scala> val manager = new ScriptEngineManager();
manager: javax.script.ScriptEngineManager = javax.script.ScriptEngineManager@3a03464

scala> val engines = manager.getEngineFactories();
engines: java.util.List[javax.script.ScriptEngineFactory] = [org.python.jsr223.PyScriptEngineFactory@4909b8da, jdk.nashorn.api.scripting.NashornScriptEngineFactory@68837a77, scala.tools.nsc.interpreter.IMain$Factory@1324409e]

Now, I can use the Jython/Python scripting engine:

scala> val engine = new ScriptEngineManager().getEngineByName("jython");
engine: javax.script.ScriptEngine = org.python.jsr223.PyScriptEngine@6094de13

scala> engine.eval("from datetime import datetime, timedelta; yesterday = str(datetime.now()-timedelta(days=1))")
res15: Object = null

scala> engine.get("yesterday")
res16: Object = 2016-02-12 23:26:38.012000

It is worth giving a disclaimer here that not all Python modules are available in Jython. Modules that require a C/Fortran dynamic linkage for the library that doesn't exist in Java are not likely to work in Jython. Specifically, NumPy and SciPy are not supported in Jython as they rely on C/Fortran. If you discover some other missing modules, you can try copying the .py file from a Python distribution to a sys.path Jython directory—if this works, consider yourself lucky.

Jython has the advantage of accessing Python-rich modules without the necessity of starting the Python runtime on each call, which might result in a significant performance saving:

scala> val startTime = System.nanoTime
startTime: Long = 54384084381087

scala> for (i <- 1 to 100) {
     |   engine.eval("from datetime import datetime, timedelta; yesterday = str(datetime.now()-timedelta(days=1))")
     |   val yesterday = engine.get("yesterday")
     | }

scala> val elapsed = 1e-9 * (System.nanoTime - startTime)
elapsed: Double = 0.270837934

scala> val startTime = System.nanoTime
startTime: Long = 54391560460133

scala> for (i <- 1 to 100) {
     |   val yesterday = Process(Seq("/usr/local/bin/python", "-c", """from datetime import datetime, timedelta; print(datetime.now()-timedelta(days=1))""")).!!
     | }

scala> val elapsed = 1e-9 * (System.nanoTime - startTime)
elapsed: Double = 2.221937263

Jython JSR 223 call is 10 times faster!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset