PredictionActor and the prediction step

The Scala web application, which takes the most recent Bitcoin price data on the Bitstamp exchange every minute from the Cryptocompare API, uses a trained ML classifier to predict the direction of price change for the next minute. It notifies the user about the decision.

Now, to launch it, from a directory with project type sbt run (or $ sudo sbt run when required). Now let us see the contents of the application.conf file:

# This is the main configuration file for the application.
# Secret key
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
application.secret="%APPLICATION_SECRET%"
# The application languages
application.langs="en"
# Global object class
# Define the Global object class for this application.
# Default to Global in the root package.sb
# application.global=Global
# Router
# Define the Router object to use for this application.
# This router will be looked up first when the application is starting up,
# so make sure this is the entry point.
# Furthermore, it's assumed your route file is named properly.
# So for an application router like `my.application.Router`,
# you may need to define a router file `conf/my.application.routes`.
# Default to Routes in the root package (and conf/routes)
# application.router=my.application.Routes
# Database configuration
# You can declare as many datasources as you want.
# By convention, the default datasource is named `default`
rootDir = "<path>/Bitcoin_price_prediction/"
db.default.driver = org.h2.Driver
db.default.url = "jdbc:h2: "<path>/Bitcoin_price_prediction/DataBase"
db.default.user = user
db.default.password = ""
play.evolutions.db.default.autoApply = true
# Evolutions
# You can disable evolutions if needed
# evolutionplugin=disabled
# Logger
# You can also configure logback (http://logback.qos.ch/),
# by providing an application-logger.xml file in the conf directory.
# Root logger:
logger.root=ERROR
# Logger used by the framework:
logger.play=INFO
# Logger provided to your application:
logger.application=DEBUG
#Enable JobModule to run scheduler
play.modules.enabled += "modules.jobs.JobModule"
#Frequency in seconds to run job. Might make sense to put 30 seconds, for recent data
constants.frequency = 30
ml.model_version = "gbt_22_binary_classes_32660767.model"

Now you can understand that there are also several variables to configure/change based on your platform and choice:

  • Change the rootDir directory to the one you have used in TrainGBT:
rootDir = "<path>/ Bitcoin_price_prediction"
  • Specify the name for the database file:
db.default.url = "jdbc:h2: "<path>/Bitcoin_price_prediction/DataBase"
  • Specify the version of the model that is used for the actual prediction:
ml.model_version = "gbt_22_binary_classes_32660767.model"
Note that the folder with such a name has to be inside rootDir. So inside rootDir, create a folder named models and copy all the folders of trained models there.

This class also implements the actor trait and overrides the receive method. The best practice for it is to define types that can be received by the actor inside the companion object, thus establishing an interface for other classes:

object PredictionActor {
def props = Props[PredictionActor]
case class PriceData(timeFrom: Long,
timeTo: Long,
priceDelta: (Long, Double)*)
}

At first, PredictionActor loads a list of models from the models folder and loads the etalon model:

val models: List[(Transformer, String)] =
SubDirectoryRetriever.getListOfSubDirectories(modelFolder)
.map(modelMap => (PipelineModel.load(modelMap("path")),modelMap("modelName")))
.toList

First, we extract a list of subdirectories inside the models folder, and from each of them, we load the trained PipeLine model. In a similar way, the etalon model is loaded, but we already know its directory. Here's how a message of the PriceData type is handled inside the receive method:

override def receive: Receive = {
case data: PriceData =>
val priceData = shrinkData(data, 1, 22)
val (predictedLabelForUnknownTimestamp, details) =
predictionService.predictPriceDeltaLabel(priceData,productionModel)

The predicted label (string) and classification details are logged, so is it possible to see the probability distribution for each class? If the actor receives a message of another type, an error is shown and nothing more is done. Then the results are sent back to SchedulerActor and sent in the variable predictedWithCurrent, as was shown in the preceding code:

sender() ! CurrentDataWithShortTermPrediction(predictedLabelForUnknownTimestamp, data)

The sender is an ActorRef reference to an object that has sent the message we are processing at the moment, so we can pass the message back with the ! operator. Then, for each model we have loaded in the beginning, we predict the label for 1-minute-old data (rows 0-21 out of 23 in total) and get the actual price delta for the latest minute we know:

models.foreach { mlModel =>
val (predictedLabel, details) =predictionService.predictPriceDeltaLabel(shrinkData(data, 0, 21), mlModel._1)
val actualDeltaPoint = data.priceDelta.toList(22)

For each model, we store the following in the DB name of the model: the timestamp for each test prediction made, the label that was predicted by the model, and the actual delta. This information is used later to generate reports on the model's performance:

storeShortTermBinaryPredictionIntoDB( mlModel._2, actualDeltaPoint._1,
predictedLabel, actualDeltaPoint._2)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset