Pipelines consist of a set of components joined together such that the DataFrame produced by one component is used as input for the next component. The components available are split into two classes: transformers and estimators.
Transformers transform one DataFrame into another, normally by appending one or more columns.
The first step in our spam classification algorithm is to split each message into an array of words. This is called
tokenization. We can use the Tokenizer
transformer, provided by MLlib:
scala> import org.apache.spark.ml.feature._ import org.apache.spark.ml.feature._ scala> val tokenizer = new Tokenizer() tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_75559f60e8cf
The behavior of transformers can be customized through getters and setters. The easiest way of obtaining a list of the parameters available is to call the .explainParams
method:
scala> println(tokenizer.explainParams) inputCol: input column name (undefined) outputCol: output column name (default: tok_75559f60e8cf__output)
We see that the behavior of a Tokenizer
instance can be customized using two parameters: inputCol
and outputCol
, describing the header of the column containing the input (the string to be tokenized) and the output (the array of words), respectively. We can set these parameters using the setInputCol
and setOutputCol
methods.
We set inputCol
to "text"
, since that is what the column is called in our training and test DataFrames. We will set outputCol
to "words"
:
scala> tokenizer.setInputCol("text").setOutputCol("words") org.apache.spark.ml.feature.Tokenizer = tok_75559f60e8cf
In due course, we will integrate tokenizer
into a pipeline, but, for now, let's just use it to transform the training DataFrame, to verify that it works correctly.
scala> val tokenizedDF = tokenizer.transform(trainDF) tokenizedDF: DataFrame = [fileName: string, text: string, category: string, words: array<string>] scala> tokenizedDF.show +--------------+----------------+--------+--------------------+ | fileName| text|category| words| +--------------+----------------+--------+--------------------+ |file:/Users...|Subject: auto...| spam|[subject:, auto, ...| |file:/Users...|Subject: want...| spam|[subject:, want, ...| |file:/Users...|Subject: n't ...| spam|[subject:, n't, m...| |file:/Users...|Subject: amaz...| spam|[subject:, amaze,...| |file:/Users...|Subject: help...| spam|[subject:, help, ...| |file:/Users...|Subject: beat...| spam|[subject:, beat, ...| |... | +--------------+----------------+--------+--------------------+
The tokenizer
transformer produces a new DataFrame with an additional column, words
, containing an array of the words in the text
column.
Clearly, we can use our
tokenizer
to transform any DataFrame with the correct schema. We could, for instance, use it on the test set. Much of machine learning involves calling the same (or a very similar) pipeline on different data sets. By providing the pipeline abstraction, MLlib facilitates reasoning about complex machine learning algorithms consisting of many cleaning, transformation, and modeling components.
The next step in our pipeline is to calculate the frequency of occurrence of each word in each message. We will eventually use these frequencies as features in our algorithm. We will use the HashingTF
transformer to transform from arrays of words to word frequency vectors for each message.
The HashingTF
transformer constructs a sparse vector of word frequencies from input iterables. Each element in the word array gets transformed to a hash code. This hash code is truncated to a value between 0 and a large number n, the total number of elements in the output vector. The term frequency vector is just the number of occurrences of the truncated hash.
Let's run through an example manually to understand how this works. We will calculate the term frequency vector for Array("the", "dog", "jumped", "over", "the")
. Let's set n, the number of elements in the sparse output vector, to 16 for this example. The first step is to calculate the hash code for each element in our array. We can use the built-in ##
method, which calculates a hash code for any object:
scala> val words = Array("the", "dog", "jumped", "over", "the") words: Array[String] = Array(the, dog, jumped, over, the) scala> val hashCodes = words.map { _.## } hashCodes: Array[Int] = Array(114801, 99644, -1148867251, 3423444, 114801)
To transform the hash codes into valid vector indices, we take the modulo of each hash by the size of the vector (16
, in this case):
scala> val indices = hashCodes.map { code => Math.abs(code % 16) } indices: Array[Int] = Array(1, 12, 3, 4, 1)
We can then create a mapping from indices to the number of times that index appears:
scala> val indexFrequency = indices.groupBy(identity).mapValues { _.size.toDouble } indexFrequency: Map[Int,Double] = Map(4 -> 1.0, 1 -> 2.0, 3 -> 1.0, 12 -> 1.0)
Finally, we can convert this map to a sparse vector, where the value at each element in the vector is the frequency with which this particular index occurs:
scala> import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg._ scala> val termFrequencies = Vectors.sparse(16, indexFrequency.toSeq) termFrequencies: linalg.Vector = (16,[1,3,4,12],[2.0,1.0,1.0,1.0])
Note that the .toString
output for a sparse vector consists of three elements: the total size of the vector, followed by two lists: the first is a series of indices, and the second is a series of values at those indices.
Using a sparse vector provides a compact and efficient way of representing the frequency of occurrence of words in the message, and is exactly how HashingTF
works under the hood. The disadvantage is that the mapping from words to indices is not necessarily unique: truncating hash codes by the length of the vector will map different strings to the same index. This is known as a collision. The solution is to make n large enough that the frequency of collisions is minimized.
HashingTF
is similar to building a hash table (for example, a Scala map) whose keys are words and whose values are the number of times that word occurs in the message, with one important difference: it does not attempt to deal with hash collisions. Thus, if two words map to the same hash, they will have the wrong frequency. There are two advantages to using this algorithm over just constructing a hash table:
HashingTF
transformation on each partition independently.The main disadvantage is that we must use machine learning algorithms that can take advantage of the sparse representation efficiently. This is the case with logistic regression, which we will use here.
As you might expect, the HashingTF
transformer takes, as parameters, the input and output columns. It also takes a parameter defining the number of distinct hash buckets in the vector. Increasing the number of buckets decreases the number of collisions. In practice, a value between and is recommended.
scala> val hashingTF = (new HashingTF() .setInputCol("words") .setOutputCol("features") .setNumFeatures(1048576)) hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_3b78eca9595c scala> val hashedDF = hashingTF.transform(tokenizedDF) hashedDF: DataFrame = [fileName: string, text: string, category: string, words: array<string>, features: vector] scala> hashedDF.select("features").show +--------------------+ | features| +--------------------+ |(1048576,[0,33,36...| |(1048576,[0,36,40...| |(1048576,[0,33,34...| |(1048576,[0,33,36...| |(1048576,[0,33,34...| |(1048576,[0,33,34...| +--------------------+
Each element in the features
column is a sparse vector:
scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> val firstRow = hashedDF.select("features").first firstRow: org.apache.spark.sql.Row = ... scala> val Row(v:Vector) = firstRow v: Vector = (1048576,[0,33,36,37,...],[1.0,3.0,4.0,1.0,...])
We can thus interpret our vector as: the word that hashes to element 33
occurs three times, the word that hashes to element 36
occurs four times etc.
We now have the features ready for logistic regression. The last step prior to running logistic regression is to create the target variable. We will transform the category
column in our DataFrame to a binary 0/1 target column. Spark provides a StringIndexer
class that replaces a set of strings in a column with doubles. A StringIndexer
is not a transformer: it must first be 'fitted' to a set of categories to calculate the mapping from string to numeric value. This introduces the second class of components in the pipeline API: estimators.
Unlike a transformer, which works "out of the box", an estimator must be fitted to a DataFrame. For our string indexer, the fitting process involves obtaining the list of unique strings ("spam"
and "ham"
) and mapping each of these to a double. The fitting process outputs a transformer which can be used on subsequent DataFrames.
scala> val indexer = (new StringIndexer() .setInputCol("category") .setOutputCol("label")) indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_16db03fd0546 scala> val indexTransform = indexer.fit(trainDF) indexTransform: StringIndexerModel = strIdx_16db03fd0546
The transformer produced by the fitting process has a labels
attribute describing the mapping it applies:
scala> indexTransform.labels Array[String] = Array(ham, spam)
Each label will get mapped to its index in the array: thus, our transformer maps ham
to 0
and spam
to 1
:
scala> val labelledDF = indexTransform.transform(hashedDF) labelledDF: org.apache.spark.sql.DataFrame = [fileName: string, text: string, category: string, words: array<string>, features: vector, label: double] scala> labelledDF.select("category", "label").distinct.show +--------+-----+ |category|label| +--------+-----+ | ham| 0.0| | spam| 1.0| +--------+-----+
We now have the feature vectors and classification labels in the correct format for logistic regression. The component for performing logistic regression is an estimator: it is fitted to a training DataFrame to create a trained model. The model can then be used to transform test DataFrames.
scala> import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.classification.LogisticRegression scala> val classifier = new LogisticRegression().setMaxIter(50) classifier: LogisticRegression = logreg_a5e921e7c1a1
The LogisticRegression
estimator expects the feature column to be named "features"
and the label column (the target) to be named "label"
, by default. There is no need to set these explicitly, since they match the column names set by hashingTF
and indexer
. There are several parameters that can be set to control how logistic regression works:
scala> println(classifier.explainParams) elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0) fitIntercept: whether to fit an intercept term (default: true) labelCol: label column name (default: label) maxIter: maximum number of iterations (>= 0) (default: 100, current: 50) regParam: regularization parameter (>= 0) (default: 0.0) threshold: threshold in binary classification prediction, in range [0, 1] (default: 0.5) tol: the convergence tolerance for iterative algorithms (default: 1.0E-6) ...
For now, we just set the
maxIter
parameter. We will look at the effect of other parameters, such as regularization, later on. Let's now fit the classifier to labelledDF
:
scala> val trainedClassifier = classifier.fit(labelledDF) trainedClassifier: LogisticRegressionModel = logreg_353d18f6a5f0
This produces a transformer that we can use on a DataFrame with a features
column. The transformer appends a prediction
column and a probability
column. We can, for instance use trainedClassifier
to transform labelledDF
, the training set itself:
scala> val labelledDFWithPredictions = trainedClassifier.transform( labelledDF) labelledDFWithPredictions: DataFrame = [fileName: string, ... scala> labelledDFWithPredictions.select($"label", $"prediction").show +-----+----------+ |label|prediction| +-----+----------+ | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| +-----+----------+
A quick way of checking the performance of our model is to just count the number of misclassified messages:
scala> labelledDFWithPredictions.filter { $"label" !== $"prediction" }.count Long = 1
In this case, logistic regression managed to correctly classify every message but one in the training set. This is perhaps unsurprising, given the large number of features and the relatively clear demarcation between the words used in spam and legitimate e-mails.
Of course, the real test of a model is not how well it performs on the training set, but how well it performs on a test set. To test this, we could just push the test DataFrame through the same stages that we used to train the model, replacing estimators with the fitted transformer that they produced. MLlib provides the pipeline abstraction to facilitate this: we wrap an ordered list of transformers and estimators in a pipeline. This pipeline is then fitted to a DataFrame corresponding to the training set. The fitting produces a PipelineModel
instance, equivalent to the pipeline but with estimators replaced by transformers, as shown in this diagram:
Let's construct the pipeline for our logistic regression spam filter:
scala> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.Pipeline scala> val pipeline = new Pipeline().setStages( Array(indexer, tokenizer, hashingTF, classifier) ) pipeline: Pipeline = pipeline_7488113e284d
Once the pipeline is defined, we fit it to the DataFrame holding the training set:
scala> val fittedPipeline = pipeline.fit(trainDF) fittedPipeline: org.apache.spark.ml.PipelineModel = pipeline_089525c6f100
When fitting a pipeline to a DataFrame, estimators and transformers are treated differently:
We can now apply the pipeline model to the test set:
scala> val testDFWithPredictions = fittedPipeline.transform(testDF) testDFWithPredictions: DataFrame = [fileName: string, ...
This has added a
prediction
column to the DataFrame with the predictions of our logistic regression model. To measure the performance of our algorithm, we calculate the classification error on the test set:
scala> testDFWithPredictions.filter { $"label" !== $"prediction" }.count Long = 20
Thus, our naive logistic regression algorithm, with no model selection, or regularization, mis-classifies 2.3% of e-mails. You may, of course, get slightly different results, since the train-test split was random.
Let's save the training and test DataFrames, with predictions, as parquet
files:
scala> import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SaveMode scala> (labelledDFWithPredictions .select("fileName", "label", "prediction", "probability") .write.mode(SaveMode.Overwrite) .parquet("transformedTrain.parquet")) scala> (testDFWithPredictions .select("fileName", "label", "prediction", "probability") .write.mode(SaveMode.Overwrite) .parquet("transformedTest.parquet"))
In spam classification, a false positive is considerably worse than a false negative: it is much worse to classify a legitimate message as spam, than it is to let a spam message through. To account for this, we could increase the threshold for classification: only messages that score, for instance, 0.7 or above would get classified as spam. This raises the obvious question of choosing the right threshold. One way to do this would be to investigate the false positive rate incurred in the test set for different thresholds, and choosing the lowest threshold to give us an acceptable false positive rate. A good way of visualizing this is to use ROC curves, which we will investigate in the next section.