Building machine learning pipelines

Spark ML is an API built on top of the DataFrames API of Spark SQL to construct machine learning pipelines. Spark ML is inspired by the scikit-learn project, which makes it easier to combine multiple algorithms into a single pipeline. The following are the concepts used in ML pipelines:

  • DataFrame: A DataFrame is used to create rows and columns of data just like an RDBMS table. A DataFrame can contain text, feature vectors, true labels, and predictions in columns.
  • Transformer: A Transformer is an algorithm to transform a DataFrame into another DataFrame. The ML model is an example of a Transformer that transforms a DataFrame with features into a DataFrame with predictions.
  • Estimator: This is an algorithm to produce a Transformer by fitting on a DataFrame. Generating a model is an example of an Estimator.
  • Pipeline: As the name indicates, a pipeline creates a workflow by chaining multiple Transformers and Estimators together.
  • Parameter: This is an API to specify parameters.

An example of a pipeline workflow

Let's see how a pipeline works with a simple document workflow. Figure 7.1 illustrates the workflow during training time:

An example of a pipeline workflow

Figure 7.1: An ML pipeline during training

The pipeline workflow to create PipelineModel can be understood with the following points:

  • The top box shows a Pipeline with three stages. The first two stages are Transformers and the third one is an Estimator.
  • The bottom box shows data flowing through the pipeline to produce a logistic regression model. Each stage produces a DataFrame.
  • The Pipeline.fit() method is called on the first DataFrame with raw documents.
  • The Tokenizer.transform() method splits the raw text documents into words and then adds a new column with words to the DataFrame.
  • The HashingTF.transform() method converts the words to feature vectors by adding a new column with the vectors to the DataFrame.
  • The pipeline first calls LogisticRegression.fit() to produce a logistic regression model.
  • After a pipeline's fit() method runs, it produces PipelineModel, which is a Transformer.

This PipelineModel is used at test time. Figure 7.2 illustrates the usage at test time:

An example of a pipeline workflow

Figure 7.2: An ML Pipeline during testing

The PipelineModel can be understood with the following points:

  • The number of stages for PipelineModel and the original pipeline are the same, but all Estimators in the original pipeline are changed as Transformers.
  • Once the transform() method is called on a test dataset, the dataset is passed through the fitted pipeline in order.
  • The transform() method updates the dataset at every stage and passes it to the next stage, as shown in the preceding image.
  • Pipelines and PipelineModels ensure that training and testing data go through the same feature processing steps.

Building an ML pipeline

This following example follows the simple text document pipeline, illustrated in the preceding images:

  1. Get into the PySpark shell and import all dependencies as follows:
    >>> from pyspark.ml import Pipeline
    >>> from pyspark.ml.classification import LogisticRegression
    >>> from pyspark.ml.feature import HashingTF, Tokenizer
    >>> from pyspark.sql import Row
    
  2. Prepare a training document DataFrame from a list of id, text, and label tuples:
    >>> LabeledDocument = Row("id", "text", "label")
    
    >>> training_df = spark.createDataFrame([
            (0L, "apache spark rdd memory", 1.0),
            (1L, "mllib pipeline", 0.0),
            (2L, "hadoop mahout", 1.0),
            (3L, "mapreduce iterative", 0.0)], ["id", "text", "label"])
    
    >>> training_df.printSchema()
    root
     |-- id: long (nullable = true)
     |-- text: string (nullable = true)
     |-- label: double (nullable = true)
    
    >>> training_df.show()
    
    Building an ML pipeline

    Figure 7.3: A screenshot of the training DataFrame

  3. Configure an ML pipeline which consists of three stages—Tokenizer, HashingTF, and LogisticRegression:
    >>> tokenizer_split = Tokenizer(inputCol="text", outputCol="words")
    >>> hashingTF_vectors = HashingTF(inputCol=tokenizer_split.getOutputCol(), outputCol="features")
    >>> log_reg = LogisticRegression(maxIter=10, regParam=0.01)
    >>> pipeline = Pipeline(stages=[tokenizer_split, hashingTF_vectors, log_reg])
    >>> 
    >>> model = pipeline.fit(training_df)
    
  4. Prepare the test documents which are unlabeled id, and text tuples:
    >>> test_df = spark.createDataFrame([
            (4L, "p q r"),
            (5L, "mllib pipeline"),
            (6L, "x y z"),
            (7L, "hadoop mahout")], ["id", "text"])
    
    >>> test_df.show()
    
    Building an ML pipeline

    Figure 7.4: A screenshot of the test DataFrame

  5. Make predictions on the test documents and print columns of interest:
    >>> prediction = model.transform(test_df)
    >>> prediction
    DataFrame[id: bigint, text: string, words: array<string>, features: vector, rawPrediction: vector, probability: vector, prediction: double]
    
    >>> selected = prediction.select("id", "text","probability", "prediction")
    >>> selected
    DataFrame[id: bigint, text: string, probability: vector, prediction: double]
    
    >>> for row in selected.collect():
             print(row)
    
    Row(id=4, text=u'p q r', probability=DenseVector([0.6427, 0.3573]), prediction=0.0)
    Row(id=5, text=u'mllib pipeline', probability=DenseVector([0.9833, 0.0167]), prediction=0.0)
    Row(id=6, text=u'x y z', probability=DenseVector([0.6427, 0.3573]), prediction=0.0)
    Row(id=7, text=u'hadoop mahout', probability=DenseVector([0.0218, 0.9782]), prediction=1.0)
    

You can try predictions with different sets of test data.

Saving and loading models

Models and pipelines can be saved to storage systems and loaded back into machine learning programs later. Use the following commands to save and load models:

model.save("/user/cloudera/spark-lr-model")
pipeline.save("/user/cloudera/unfit-lr-model")

from pyspark.ml import Pipeline, PipelineModel
lodeadModel = PipelineModel.load("/user/cloudera/spark-lr-model")
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset