In the previous chapter, we worked with the MLlib package in Spark that operated strictly on RDDs. In this chapter, we move to the ML part of Spark that operates strictly on DataFrames. Also, according to the Spark documentation, the primary machine learning API for Spark is now the DataFrame-based set of models contained in the spark.ml
package.
So, let's get to it!
In this chapter, we will reuse a portion of the dataset we played within the previous chapter. The data can be downloaded from http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.
In this chapter, you will learn how to do the following:
At the top level, the package exposes three main abstract classes: a Transformer
, an Estimator
, and a Pipeline
. We will shortly explain each with some short examples. We will provide more concrete examples of some of the models in the last section of this chapter.
The Transformer
class, like the name suggests, transforms your data by (normally) appending a new column to your DataFrame.
At the high level, when deriving from the Transformer
abstract class, each and every new Transformer
needs to implement a .transform(...)
method. The method, as a first and normally the only obligatory parameter, requires passing a DataFrame to be transformed. This, of course, varies method-by-method in the ML package: other popular parameters are inputCol
and outputCol
; these, however, frequently default to some predefined values, such as, for example, 'features'
for the inputCol
parameter.
There are many Transformers
offered in the spark.ml.feature
and we will briefly describe them here (before we use some of them later in this chapter):
Binarizer
: Given a threshold, the method takes a continuous variable and transforms it into a binary one.Bucketizer
: Similar to the Binarizer
, this method takes a list of thresholds (the splits
parameter) and transforms a continuous variable into a multinomial one.ChiSqSelector
: For the categorical target variables (think classification models), this feature allows you to select a predefined number of features (parameterized by the numTopFeatures
parameter) that explain the variance in the target the best. The selection is done, as the name of the method suggests, using a Chi-Square test. It is one of the two-step methods: first, you need to .fit(...)
your data (so the method can calculate the Chi-square tests). Calling the .fit(...)
method (you pass your DataFrame as a parameter) returns a ChiSqSelectorModel
object that you can then use to transform your DataFrame using the .transform(...)
method.More information on Chi-squares can be found here: http://ccnmtl.columbia.edu/projects/qmss/the_chisquare_test/about_the_chisquare_test.html.
CountVectorizer
: This is useful for a tokenized text (such as [['Learning', 'PySpark', 'with', 'us'],['us', 'us', 'us']]
). It is one of two-step methods: first, you need to .fit(...)
, that is, learn the patterns from your dataset, before you can .transform(...)
with the CountVectorizerModel
returned by the .fit(...)
method. The output from this transformer, for the tokenized text presented previously, would look similar to this: [(4, [0, 1, 2, 3], [1.0, 1.0, 1.0, 1.0]),(4, [3], [3.0])]
.DCT
: The Discrete Cosine Transform takes a vector of real values and returns a vector of the same length, but with the sum of cosine functions oscillating at different frequencies. Such transformations are useful to extract some underlying frequencies in your data or in data compression.ElementwiseProduct
: A method that returns a vector with elements that are products of the vector passed to the method, and a vector passed as the scalingVec
parameter. For example, if you had a [10.0, 3.0, 15.0]
vector and your scalingVec
was [0.99, 3.30, 0.66]
, then the vector you would get would look as follows: [9.9, 9.9, 9.9]
.HashingTF
: A hashing trick transformer that takes a list of tokenized text and returns a vector (of predefined length) with counts. From PySpark's documentation:"Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns."
IDF
: This method computes an Inverse Document Frequency for a list of documents. Note that the documents need to already be represented as a vector (for example, using either the HashingTF
or CountVectorizer
).IndexToString
: A complement to the StringIndexer
method. It uses the encoding from the StringIndexerModel
object to reverse the string index to original values. As an aside, please note that this sometimes does not work and you need to specify the values from the StringIndexer
.MaxAbsScaler
: Rescales the data to be within the [-1.0, 1.0]
range (thus, it does not shift the center of the data).MinMaxScaler
: This is similar to the MaxAbsScaler
with the difference that it scales the data to be in the [0.0, 1.0]
range.NGram
: This method takes a list of tokenized text and returns n-grams: pairs, triples, or n-mores of subsequent words. For example, if you had a ['good', 'morning', 'Robin', 'Williams']
vector you would get the following output: ['good morning', 'morning Robin', 'Robin Williams']
.Normalizer
: This method scales the data to be of unit norm using the p-norm value (by default, it is L2).OneHotEncoder
: This method encodes a categorical column to a column of binary vectors.PCA
: Performs the data reduction using principal component analysis.PolynomialExpansion
: Performs a polynomial expansion of a vector. For example, if you had a vector symbolically written as [x, y, z]
, the method would produce the following expansion: [x, x*x, y, x*y, y*y, z, x*z, y*z, z*z]
.QuantileDiscretizer
: Similar to the Bucketizer
method, but instead of passing the splits parameter, you pass the numBuckets
one. The method then decides, by calculating approximate quantiles over your data, what the splits should be.RegexTokenizer
: This is a string tokenizer using regular expressions.RFormula
: For those of you who are avid R users, you can pass a formula such as vec ~ alpha * 3 + beta
(assuming your DataFrame
has the alpha
and beta
columns) and it will produce the vec
column given the expression.SQLTransformer
: Similar to the previous, but instead of R-like formulas, you can use SQL syntax.StandardScaler
: Standardizes the column to have a 0 mean and standard deviation equal to 1.StopWordsRemover
: Removes stop words (such as 'the'
or 'a'
) from a tokenized text.StringIndexer
: Given a list of all the words in a column, this will produce a vector of indices.Tokenizer
: This is the default tokenizer that converts the string to lower case and then splits on space(s).VectorAssembler
: This is a highly useful transformer that collates multiple numeric (vectors included) columns into a single column with a vector representation. For example, if you had three columns in your DataFrame:df = spark.createDataFrame( [(12, 10, 3), (1, 4, 2)], ['a', 'b', 'c'])
The output of calling:
ft.VectorAssembler(inputCols=['a', 'b', 'c'], outputCol='features') .transform(df) .select('features') .collect()
It would look as follows:
[Row(features=DenseVector([12.0, 10.0, 3.0])), Row(features=DenseVector([1.0, 4.0, 2.0]))]
VectorIndexer
: This is a method for indexing categorical columns into a vector of indices. It works in a column-by-column fashion, selecting distinct values from the column, sorting and returning an index of the value from the map instead of the original value.VectorSlicer
: Works on a feature vector, either dense or sparse: given a list of indices, it extracts the values from the feature vector.Word2Vec
: This method takes a sentence (string) as an input and transforms it into a map of {string, vector}
format, a representation that is useful in natural language processing.Estimators can be thought of as statistical models that need to be estimated to make predictions or classify your observations.
If deriving from the abstract Estimator
class, the new model has to implement the .fit(...)
method that fits the model given the data found in a DataFrame and some default or user-specified parameters.
There are a lot of estimators available in PySpark and we will now shortly describe the models available in Spark 2.0.
The ML package provides a data scientist with seven classification models to choose from. These range from the simplest ones (such as logistic regression) to more sophisticated ones. We will provide short descriptions of each of them in the following section:
LogisticRegression
: The benchmark model for classification. The logistic regression uses a logit function to calculate the probability of an observation belonging to a particular class. At the time of writing, the PySpark ML supports only binary classification problems.DecisionTreeClassifier
: A classifier that builds a decision tree to predict a class for an observation. Specifying the maxDepth
parameter limits the depth the tree grows, the minInstancePerNode
determines the minimum number of observations in the tree node required to further split, the maxBins
parameter specifies the maximum number of bins the continuous variables will be split into, and the impurity
specifies the metric to measure and calculate the information gain from the split.GBTClassifier
: A Gradient Boosted Trees model for classification. The model belongs to the family of ensemble models: models that combine multiple weak predictive models to form a strong one. At the moment, the GBTClassifier
model supports binary labels, and continuous and categorical features.RandomForestClassifier
: This model produces multiple decision trees (hence the name—forest) and uses the mode
output of those decision trees to classify observations. The RandomForestClassifier
supports both binary and multinomial labels.NaiveBayes
: Based on the Bayes' theorem, this model uses conditional probability theory to classify observations. The NaiveBayes
model in PySpark ML supports both binary and multinomial labels.MultilayerPerceptronClassifier
: A classifier that mimics the nature of a human brain. Deeply rooted in the Artificial Neural Networks theory, the model is a black-box, that is, it is not easy to interpret the internal parameters of the model. The model consists, at a minimum, of three, fully connected layers
(a parameter that needs to be specified when creating the model object) of artificial neurons: the input layer (that needs to be equal to the number of features in your dataset), a number of hidden layers (at least one), and an output layer with the number of neurons equal to the number of categories in your label. All the neurons in the input and hidden layers have a sigmoid activation function, whereas the activation function of the neurons in the output layer is softmax.OneVsRest
: A reduction of a multiclass classification to a binary one. For example, in the case of a multinomial label, the model can train multiple binary logistic regression models. For example, if label == 2
, the model will build a logistic regression where it will convert the label == 2
to 1
(all remaining label values would be set to 0
) and then train a binary model. All the models are then scored and the model with the highest probability wins.There are seven models available for regression tasks in the PySpark ML package. As with classification, these range from some basic ones (such as the obligatory linear regression) to more complex ones:
AFTSurvivalRegression
: Fits an Accelerated Failure Time regression model. It is a parametric model that assumes that a marginal effect of one of the features accelerates or decelerates a life expectancy (or process failure). It is highly applicable for the processes with well-defined stages.DecisionTreeRegressor
: Similar to the model for classification with an obvious distinction that the label is continuous instead of binary (or multinomial).GBTRegressor
: As with the DecisionTreeRegressor
, the difference is the data type of the label.GeneralizedLinearRegression
: A family of linear models with differing kernel functions (link functions). In contrast to the linear regression that assumes normality of error terms, the GLM allows the label to have different error term distributions: the GeneralizedLinearRegression
model from the PySpark ML package supports gaussian
, binomial
, gamma
, and poisson
families of error distributions with a host of different link functions.IsotonicRegression
: A type of regression that fits a free-form, non-decreasing line to your data. It is useful to fit the datasets with ordered and increasing observations.LinearRegression
: The most simple of regression models, it assumes a linear relationship between features and a continuous label, and normality of error terms.RandomForestRegressor
: Similar to either DecisionTreeRegressor
or GBTRegressor
, the RandomForestRegressor
fits a continuous label instead of a discrete one.Clustering is a family of unsupervised models that are used to find underlying patterns in your data. The PySpark ML package provides the four most popular models at the moment:
BisectingKMeans
: A combination of the k-means clustering method and hierarchical clustering. The algorithm begins with all observations in a single cluster and iteratively splits the data into k
clusters.Check out this website for more information on pseudo-algorithms: http://minethedata.blogspot.com/2012/08/bisecting-k-means.html.
KMeans
: This is the famous k-mean algorithm that separates data into k
clusters, iteratively searching for centroids that minimize the sum of square distances between each observation and the centroid of the cluster it belongs to.GaussianMixture
: This method uses k
Gaussian distributions with unknown parameters to dissect the dataset. Using the Expectation-Maximization algorithm, the parameters for the Gaussians are found by maximizing the log-likelihood function.LDA
: This model is used for topic modeling in natural language processing applications.There is also one recommendation model available in PySpark ML, but we will refrain from describing it here.
A Pipeline
in PySpark ML is a concept of an end-to-end transformation-estimation process (with distinct stages) that ingests some raw data (in a DataFrame form), performs the necessary data carpentry (transformations), and finally estimates a statistical model (estimator).
A Pipeline
can be thought of as a chain of multiple discrete stages. When a .fit(...)
method is executed on a Pipeline
object, all the stages are executed in the order they were specified in the stages
parameter; the stages
parameter is a list of Transformer
and Estimator
objects. The .fit(...)
method of the Pipeline
object executes the .transform(...)
method for the Transformer
s and the .fit(...)
method for the Estimators
.
Normally, the output of a preceding stage becomes the input for the following stage: when deriving from either the Transformer
or Estimator
abstract classes, one needs to implement the .getOutputCol()
method that returns the value of the outputCol
parameter specified when creating an object.