In this section, we will use the portion of the dataset from the previous chapter to present the ideas of PySpark ML.
If you have not yet downloaded the data while reading the previous chapter, it can be accessed here: http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz.
In this section, we will, once again, attempt to predict the chances of the survival of an infant.
First, we load the data with the help of the following code:
import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS', typ.IntegerType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('CIG_BEFORE', typ.IntegerType()), ('CIG_1_TRI', typ.IntegerType()), ('CIG_2_TRI', typ.IntegerType()), ('CIG_3_TRI', typ.IntegerType()), ('MOTHER_HEIGHT_IN', typ.IntegerType()), ('MOTHER_PRE_WEIGHT', typ.IntegerType()), ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()), ('MOTHER_WEIGHT_GAIN', typ.IntegerType()), ('DIABETES_PRE', typ.IntegerType()), ('DIABETES_GEST', typ.IntegerType()), ('HYP_TENS_PRE', typ.IntegerType()), ('HYP_TENS_GEST', typ.IntegerType()), ('PREV_BIRTH_PRETERM', typ.IntegerType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ]) births = spark.read.csv('births_transformed.csv.gz', header=True, schema=schema)
We specify the schema of the DataFrame; our severely limited dataset now only has 17 columns.
Before we can use the dataset to estimate a model, we need to do some transformations. Since statistical models can only operate on numeric data, we will have to encode the BIRTH_PLACE
variable.
Before we do any of this, since we will use a number of different feature transformations later in this chapter, let's import them all:
import pyspark.ml.feature as ft
To encode the BIRTH_PLACE
column, we will use the OneHotEncoder
method. However, the method cannot accept StringType
columns; it can only deal with numeric types so first we will cast the column to an IntegerType
:
births = births .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] .cast(typ.IntegerType()))
Having done this, we can now create our first Transformer
:
encoder = ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC')
Let's now create a single column with all the features collated together. We will use the VectorAssembler
method:
featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col in labels[2:]] + [encoder.getOutputCol()], outputCol='features' )
The inputCols
parameter passed to the VectorAssembler
object is a list of all the columns to be combined together to form the outputCol
—the 'features'
. Note that we use the output of the encoder object (by calling the .getOutputCol()
method), so we do not have to remember to change this parameter's value should we change the name of the output column in the encoder object at any point.
It's now time to create our first estimator.
In this example, we will (once again) use the logistic regression model. However, later in the chapter, we will showcase some more complex models from the .classification
set of PySpark ML models, so we load the whole section:
import pyspark.ml.classification as cl
Once loaded, let's create the model by using the following code:
logistic = cl.LogisticRegression( maxIter=10, regParam=0.01, labelCol='INFANT_ALIVE_AT_REPORT')
We would not have to specify the labelCol
parameter if our target column had the name 'label'
. Also, if the output of our featuresCreator
was not called 'features',
we would have to specify the featuresCol
by (most conveniently) calling the getOutputCol()
method on the featuresCreator
object.
All that is left now is to create a Pipeline
and fit the model. First, let's load the Pipeline
from the ML package:
from pyspark.ml import Pipeline
Creating a Pipeline
is really easy. Here's how our pipeline should look like conceptually:
Converting this structure into a Pipeline
is a walk in the park:
pipeline = Pipeline(stages=[ encoder, featuresCreator, logistic ])
That's it! Our pipeline
is now created so we can (finally!) estimate the model.
Before you fit the model, we need to split our dataset into training and testing datasets. Conveniently, the DataFrame API has the .randomSplit(...)
method:
births_train, births_test = births .randomSplit([0.7, 0.3], seed=666)
The first parameter is a list of dataset proportions that should end up in, respectively, births_train
and births_test
subsets. The seed
parameter provides a seed to the randomizer.
You can also split the dataset into more than two subsets as long as the elements of the list sum up to 1, and you unpack the output into as many subsets.
For example, we could split the births dataset into three subsets like this:
train, test, val = births. randomSplit([0.7, 0.2, 0.1], seed=666)
The preceding code would put a random 70% of the births dataset into the train
object, 20% would go to the test
, and the val
DataFrame would hold the remaining 10%.
Now it is about time to finally run our pipeline and estimate our model:
model = pipeline.fit(births_train) test_model = model.transform(births_test)
The .fit(...)
method of the pipeline object takes our training dataset as an input. Under the hood, the births_train
dataset is passed first to the encoder
object. The DataFrame that is created at the encoder
stage then gets passed to the featuresCreator
that creates the 'features'
column. Finally, the output from this stage is passed to the logistic
object that estimates the final model.
The .fit(...)
method returns the PipelineModel
object (the model
object in the preceding snippet) that can then be used for prediction; we attain this by calling the .transform(...)
method and passing the testing dataset created earlier. Here's what the test_model
looks like in the following command:
test_model.take(1)
It generates the following output:
As you can see, we get all the columns from the Transfomers
and Estimators
. The logistic regression model outputs several columns: the rawPrediction
is the value of the linear combination of features and the β coefficients, the probability
is the calculated probability for each of the classes, and finally, the prediction
is our final class assignment.
Obviously, we would like to now test how well our model did. PySpark exposes a number of evaluation methods for classification and regression in the .evaluation
section of the package:
import pyspark.ml.evaluation as ev
We will use the BinaryClassficationEvaluator
to test how well our model performed:
evaluator = ev.BinaryClassificationEvaluator( rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')
The rawPredictionCol
can either be the rawPrediction
column produced by the estimator or the probability
.
Let's see how well our model performed:
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'})) print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))
The preceding code produces the following result:
The area under the ROC of 74% and area under PR of 71% shows a well-defined model, but nothing out of extraordinary; if we had other features, we could drive this up, but this is not the purpose of this chapter (nor the book, for that matter).
PySpark allows you to save the Pipeline
definition for later use. It not only saves the pipeline structure, but also all the definitions of all the Transformers
and Estimators
:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline' pipeline.write().overwrite().save(pipelinePath)
So, you can load it up later and use it straight away to .fit(...)
and predict:
loadedPipeline = Pipeline.load(pipelinePath) loadedPipeline .fit(births_train) .transform(births_test) .take(1)
The preceding code produces the same result (as expected):
If you, however, want to save the estimated model, you can also do that; instead of saving the Pipeline
, you need to save the PipelineModel
.
To save your model, see the following the example:
from pyspark.ml import PipelineModel modelPath = './infant_oneHotEncoder_Logistic_PipelineModel' model.write().overwrite().save(modelPath) loadedPipelineModel = PipelineModel.load(modelPath) test_reloadedModel = loadedPipelineModel.transform(births_test)
The preceding script uses the .load(...)
method, a class method of the PipelineModel
class, to reload the estimated model. You can compare the result of test_reloadedModel.take(1)
with the output of test_model.take(1)
we presented earlier.