Model training for prediction

Inside the project, in the package folder prediction.training, there is a Scala object called TrainGBT.scala. Before launching, you have to specify/change four things:

  • In the code, you need to set up spark.sql.warehouse.dir in some actual place on your computer that has several gigabytes of free space: set("spark.sql.warehouse.dir", "/home/user/spark")
  • The RootDir is the main folder, where all files and train models will be stored:rootDir = "/home/user/projects/btc-prediction/"
  • Make sure that the x filename matches the one produced by the Scala script in the preceding step: x = spark.read.format("com.databricks.spark.csv ").schema(xSchema).load(rootDir + "scala_test_x.csv")
  • Make sure that the y filename matches the one produced by Scala script: y_tmp=spark.read.format("com.databricks.spark.csv").schema(ySchema).load(rootDir + "scala_test_y.csv")

The code for training uses the Apache Spark ML library (and libraries required for it) to train the classifier, which means they have to be present in your class path to be able to run it. The easiest way to do that (since the whole project uses SBT) is to run it from the project root folder by typing sbtrun-main prediction.training.TrainGBT, which will resolve all dependencies and launch training.

Depending on the number of iterations and depth, it can take several hours to train the model. Now let us see how training is performed on the example of the gradient-boosted trees model. First, we need to create a SparkSession object:

val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", ""/home/user/spark/")
.appName("Bitcoin Preprocessing")
.getOrCreate()

Then, we define a schema of data for x and y. We rename the columns to t0-t21, to indicate that it's a time series:

val xSchema = StructType(Array(
StructField("t0", DoubleType, true),
StructField("t1", DoubleType, true),
StructField("t2", DoubleType, true),
StructField("t3", DoubleType, true),
StructField("t4", DoubleType, true),
StructField("t5", DoubleType, true),
StructField("t6", DoubleType, true),
StructField("t7", DoubleType, true),
StructField("t8", DoubleType, true),
StructField("t9", DoubleType, true),
StructField("t10", DoubleType, true),
StructField("t11", DoubleType, true),
StructField("t12", DoubleType, true),
StructField("t13", DoubleType, true),
StructField("t14", DoubleType, true),
StructField("t15", DoubleType, true),
StructField("t16", DoubleType, true),
StructField("t17", DoubleType, true),
StructField("t18", DoubleType, true),
StructField("t19", DoubleType, true),
StructField("t20", DoubleType, true),
StructField("t21", DoubleType, true))
)

Then we read the files we defined for the schema. It was more convenient to generate two separate files in Scala for data and labels, so here we have to join them into a single DataFrame:

import spark.implicits._
val y = y_tmp.withColumn("y", 'y.cast(IntegerType))
import org.apache.spark.sql.functions._

val x_id = x.withColumn("id", monotonically_increasing_id())
val y_id = y.withColumn("id", monotonically_increasing_id())
val data = x_id.join(y_id, "id")

The next step is required by Spark—we need to vectorize the features:

val featureAssembler = new VectorAssembler()
.setInputCols(Array("t0", "t1", "t2", "t3",
"t4", "t5", "t6", "t7",
"t8", "t9", "t10", "t11",
"t12", "t13", "t14", "t15",
"t16", "t17", "t18", "t19",
"t20", "t21"))
.setOutputCol("features")

We split the data into train and test sets randomly in the proportion of 75% to 25%. We set the seed so that the splits would be equal among all times we run the training:

val Array(trainingData,testData) = dataWithLabels.randomSplit(Array(0.75, 0.25), 123)

We then define the model. It tells which columns are features and which are labels. It also sets parameters:

val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setSeed(123)

Create a pipeline of steps—vector assembling of features and running GBT:

val pipeline = new Pipeline()
.setStages(Array(featureAssembler, gbt))

Defining evaluator function—how the model knows whether it is doing well or not. As we have only two classes that are imbalanced, accuracy is a bad measurement; area under the ROC curve is better:

val rocEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")

K-fold cross-validation is used to avoid overfitting; it takes out one-fifth of the data at each iteration, trains the model on the rest, and then tests on this one-fifth:

val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(rocEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
.setSeed(123)
val cvModel = cv.fit(trainingData)

After we get the trained model (which can take an hour or more depending on the number of iterations and parameters we want to iterate on, specified in paramGrid), we then compute the predictions on the test data:

val predictions = cvModel.transform(testData)

In addition, evaluate quality of predictions:

val roc = rocEvaluator.evaluate(predictions)

The trained model is saved for later usage by the prediction service:

val gbtModel = cvModel.bestModel.asInstanceOf[PipelineModel]
gbtModel.save(rootDir + "__cv__gbt_22_binary_classes_" + System.nanoTime() / 1000000 + ".model")

In summary, the code for model training is given as follows:

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.ml.{ Pipeline, PipelineModel }

import org.apache.spark.ml.classification.{ GBTClassificationModel, GBTClassifier, RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler, VectorIndexer}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
import org.apache.spark.sql.SparkSession

object TrainGradientBoostedTree {
def main(args: Array[String]): Unit = {
val maxBins = Seq(5, 7, 9)
val numFolds = 10
val maxIter: Seq[Int] = Seq(10)
val maxDepth: Seq[Int] = Seq(20)
val rootDir = "output/"
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", ""/home/user/spark/")
.appName("Bitcoin Preprocessing")
.getOrCreate()

val xSchema = StructType(Array(
StructField("t0", DoubleType, true),
StructField("t1", DoubleType, true),
StructField("t2", DoubleType, true),
StructField("t3", DoubleType, true),
StructField("t4", DoubleType, true),
StructField("t5", DoubleType, true),
StructField("t6", DoubleType, true),
StructField("t7", DoubleType, true),
StructField("t8", DoubleType, true),
StructField("t9", DoubleType, true),
StructField("t10", DoubleType, true),
StructField("t11", DoubleType, true),
StructField("t12", DoubleType, true),
StructField("t13", DoubleType, true),
StructField("t14", DoubleType, true),
StructField("t15", DoubleType, true),
StructField("t16", DoubleType, true),
StructField("t17", DoubleType, true),
StructField("t18", DoubleType, true),
StructField("t19", DoubleType, true),
StructField("t20", DoubleType, true),
StructField("t21", DoubleType, true)))

val ySchema = StructType(Array(StructField("y", DoubleType,
true)))
val x = spark.read.format("csv").schema(xSchema).load(rootDir +
"scala_test_x.csv")
val y_tmp =
spark.read.format("csv").schema(ySchema).load(rootDir +
"scala_test_y.csv")

import spark.implicits._
val y = y_tmp.withColumn("y", 'y.cast(IntegerType))

import org.apache.spark.sql.functions._
//joining 2 separate datasets in single Spark dataframe
val x_id = x.withColumn("id", monotonically_increasing_id())
val y_id = y.withColumn("id", monotonically_increasing_id())
val data = x_id.join(y_id, "id")
val featureAssembler = new VectorAssembler()
.setInputCols(Array("t0", "t1", "t2", "t3", "t4", "t5",
"t6", "t7", "t8", "t9", "t10", "t11",
"t12", "t13", "t14", "t15", "t16",
"t17","t18", "t19", "t20", "t21"))
.setOutputCol("features")
val encodeLabel = udf[Double, String] { case "1" => 1.0 case
"0" => 0.0 }
val dataWithLabels = data.withColumn("label",
encodeLabel(data("y")))

//123 is seed number to get same datasplit so we can tune
params
val Array(trainingData, testData) =
dataWithLabels.randomSplit(Array(0.75, 0.25), 123)
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setSeed(123)
val pipeline = new Pipeline()
.setStages(Array(featureAssembler, gbt))
// ***********************************************************
println("Preparing K-fold Cross Validation and Grid Search")
// ***********************************************************
val paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxIter, maxIter)
.addGrid(gbt.maxDepth, maxDepth)
.addGrid(gbt.maxBins, maxBins)
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
.setSeed(123)
// ************************************************************
println("Training model with GradientBoostedTrees algorithm")
// ************************************************************
// Train model. This also runs the indexers.
val cvModel = cv.fit(trainingData)
cvModel.save(rootDir + "cvGBT_22_binary_classes_" +
System.nanoTime() / 1000000 + ".model")
println("Evaluating model on train and test data and
calculating RMSE")
// **********************************************************************
// Make a sample prediction
val predictions = cvModel.transform(testData)

// Select (prediction, true label) and compute test error.
val rocEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val roc = rocEvaluator.evaluate(predictions)
val prEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderPR")
val pr = prEvaluator.evaluate(predictions)
val gbtModel = cvModel.bestModel.asInstanceOf[PipelineModel]
gbtModel.save(rootDir + "__cv__gbt_22_binary_classes_" +
System.nanoTime()/1000000 +".model")

println("Area under ROC curve = " + roc)
println("Area under PR curve= " + pr)
println(predictions.select().show(1))
spark.stop()
}
}

Now let us see how the training went:

>>> 
Area under ROC curve = 0.6045355104779828
Area under PR curve= 0.3823834607704922

Therefore, we have not received very high accuracy, as the ROC is only 60.50% out of the best GBT model. Nevertheless, if we tune the hyperparameters, we will get better accuracy.

However, as I did not have enough time, I did not iterate the training for long, but you should definitely try.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset