Spark ML is an API built on top of the DataFrames API of Spark SQL to construct machine learning pipelines. Spark ML is inspired by the scikit-learn project, which makes it easier to combine multiple algorithms into a single pipeline. The following are the concepts used in ML pipelines:
Let's see how a pipeline works with a simple document workflow. Figure 7.1 illustrates the workflow during training time:
The pipeline workflow to create PipelineModel
can be understood with the following points:
Pipeline.fit()
method is called on the first DataFrame with raw documents.Tokenizer.transform()
method splits the raw text documents into words and then adds a new column with words to the DataFrame.HashingTF.transform()
method converts the words to feature vectors by adding a new column with the vectors to the DataFrame.LogisticRegression.fit()
to produce a logistic regression model.fit()
method runs, it produces PipelineModel
, which is a Transformer.This PipelineModel
is used at test time. Figure 7.2 illustrates the usage at test time:
The PipelineModel
can be understood with the following points:
PipelineModel
and the original pipeline are the same, but all Estimators in the original pipeline are changed as Transformers.transform()
method is called on a test dataset, the dataset is passed through the fitted pipeline in order.transform()
method updates the dataset at every stage and passes it to the next stage, as shown in the preceding image.PipelineModels
ensure that training and testing data go through the same feature processing steps.This following example follows the simple text document pipeline, illustrated in the preceding images:
>>> from pyspark.ml import Pipeline >>> from pyspark.ml.classification import LogisticRegression >>> from pyspark.ml.feature import HashingTF, Tokenizer >>> from pyspark.sql import Row
id
, text
, and label
tuples:>>> LabeledDocument = Row("id", "text", "label") >>> training_df = spark.createDataFrame([ (0L, "apache spark rdd memory", 1.0), (1L, "mllib pipeline", 0.0), (2L, "hadoop mahout", 1.0), (3L, "mapreduce iterative", 0.0)], ["id", "text", "label"]) >>> training_df.printSchema() root |-- id: long (nullable = true) |-- text: string (nullable = true) |-- label: double (nullable = true) >>> training_df.show()
>>> tokenizer_split = Tokenizer(inputCol="text", outputCol="words") >>> hashingTF_vectors = HashingTF(inputCol=tokenizer_split.getOutputCol(), outputCol="features") >>> log_reg = LogisticRegression(maxIter=10, regParam=0.01) >>> pipeline = Pipeline(stages=[tokenizer_split, hashingTF_vectors, log_reg]) >>> >>> model = pipeline.fit(training_df)
id
, and text
tuples:>>> test_df = spark.createDataFrame([ (4L, "p q r"), (5L, "mllib pipeline"), (6L, "x y z"), (7L, "hadoop mahout")], ["id", "text"]) >>> test_df.show()
>>> prediction = model.transform(test_df) >>> prediction DataFrame[id: bigint, text: string, words: array<string>, features: vector, rawPrediction: vector, probability: vector, prediction: double] >>> selected = prediction.select("id", "text","probability", "prediction") >>> selected DataFrame[id: bigint, text: string, probability: vector, prediction: double] >>> for row in selected.collect(): print(row) Row(id=4, text=u'p q r', probability=DenseVector([0.6427, 0.3573]), prediction=0.0) Row(id=5, text=u'mllib pipeline', probability=DenseVector([0.9833, 0.0167]), prediction=0.0) Row(id=6, text=u'x y z', probability=DenseVector([0.6427, 0.3573]), prediction=0.0) Row(id=7, text=u'hadoop mahout', probability=DenseVector([0.0218, 0.9782]), prediction=1.0)
You can try predictions with different sets of test data.
Models and pipelines can be saved to storage systems and loaded back into machine learning programs later. Use the following commands to save and load models:
model.save("/user/cloudera/spark-lr-model") pipeline.save("/user/cloudera/unfit-lr-model") from pyspark.ml import Pipeline, PipelineModel lodeadModel = PipelineModel.load("/user/cloudera/spark-lr-model")