Let's kick the tires

This final section introduces the key elements of the training and classification workflow. A test case using a simple logistic regression is used to illustrate each step of the computational workflow.

Overview of computational workflows

In its simplest form, a computational workflow to perform runtime processing of a dataset is composed of the following stages:

  1. Loading the dataset from files, databases, or any streaming devices.
  2. Splitting the dataset for parallel data processing.
  3. Preprocessing data using filtering techniques, analysis of variance, and applying penalty and normalization functions whenever necessary.
  4. Applying the model, either a set of clusters or classes to classify new data.
  5. Assessing the quality of the model.

A similar sequence of tasks is used to extract a model from a training dataset:

  1. Loading the dataset from files, databases, or any streaming devices.
  2. Splitting the dataset for parallel data processing.
  3. Applying filtering techniques, analysis of variance, and penalty and normalization functions to the raw dataset whenever necessary.
  4. Selecting the training, testing, and validation set from the cleansed input data.
  5. Extracting key features, establishing affinity between a similar group of observations using clustering techniques or supervised learning algorithms.
  6. Reducing the number of features to a manageable set of attributes to avoid overfitting the training set.
  7. Validating the model and tuning the model by iterating steps 5, 6, and 7 until the error meets criteria.
  8. Storing the model into the file or database to be loaded for runtime processing of new observations.

Data clustering and data classification can be performed independent of each other or as part of a workflow that uses clustering techniques as a preprocessing stage of the training phase of a supervised learning algorithm. Data clustering does not require a model to be extracted from a training set, while classification can be performed only if a model has been built from the training set. The following image gives an overview of training and classification:

Overview of computational workflows

A generic data flow for training and running a model

This diagram is an overview of a typical data mining processing pipeline. The first phase consists of extracting the model through clustering or training of a supervised learning algorithm. The model is then validated against test data, for which the source is the same as the training set but with different observations. Once the model is created and validated, it can be used to classify real-time data or predict future behavior. In reality, real-world workflows are more complex and require being dynamically configurable to allow experimentation of different models. Several alternative classifiers can be used to perform a regression and different filtering algorithms are applied against input data depending of the latent noise in the raw data.

Writing a simple workflow

This book relies on financial data to experiment with a different learning strategy. The objective of the exercise is to build a model that can discriminate between volatile and nonvolatile trading sessions. For this first example, we select a simplified version of the logistic regression as our classifier as we treat a stock-price-volume action as a continuous or pseudo-continuous process.

Note

Logistic regression

Logistic regression is treated in depth in Chapter 6, Regression and Regularization. The model treated in this example is a simple binary classifier using logistic regression for two-dimensional observations.

The classification of trading sessions according to their volatility is as follows:

  • Select a dataset
  • Load the dataset
  • Preprocess the dataset
  • Display data
  • Create the model through training
  • Classify new data

Selecting a dataset

Throughout the book, we will rely on financial data to evaluate and discuss the merit of different data processing and machine learning methods. In this example, the data is extracted from Yahoo! Finances using the CSV format with the following fields:

  • Date
  • Price at open
  • Highest price in session
  • Lowest price in session
  • Price at session close
  • Volume
  • Adjust price at session close

Let's create a simple program that loads the content of the file, executes some simple preprocessing functions, and creates a simple model. We selected the CSCO stock price between January 1, 2012 and December 1, 2013 as our data input.

Let's consider two variables, price and volume, as illustrated by the following screenshot. The top graph displays the variation of the price of Cisco stock over time and the bottom bar chart represents the daily trading volume on Cisco stock over time:

Selecting a dataset

Price-Volume action for the Cisco stock

Loading the dataset

The first step is loading the dataset from a local file. Typically, large datasets are loaded from a database or distributed filesystem such as Hadoop Distributed File System (HDFS), as shown here:

def load(fileName: String): Option[XYTSeries] = {
  val src =  Source.fromFile(fileName)
  val fields = src.getLines.map( _.split(CSV_DELIM)).toArray //1
  val cols = fields.drop(1) //2
  val data = transform(cols)
  src.close //3
  Some(data)
}

The transform method will be described in the next section.

The data file is extracted through an invocation of the Source.fromFile static method, and then the fields are extracted through a map (line 1). The header (first) row is removed with a call to drop (line 2).

Tip

Data extraction

The Source.fromFile.getLines.map invocation pipeline method returns an iterator, which needs to be converted into an array to store the information into memory.

The file has to be closed to avoid leaking of the file handle (line 3).

Tip

Code readability

A long pipeline of Scala high-order methods make the code and underlying code quite difficult to read. It is recommended to break down long chains of method calls. The following code is an example of a long chain of method calls:

val cols = Source.fromFile.getLines.map( _.split(CSV_DELIM).toArray.drop(1)

We can break down such method calls into several steps as follows:

val lines = Source.fromFile.getLines
val fields = lines.map(_.split(CSV_DELIM).toArray
val cols = fields.drop(1)

We strongly encourage you to consult the excellent guide Effective Scala, written by Marius Eriksen from Twitter. This is definitively a must read for any Scala developer [1:10].

Preprocessing the dataset

The next step is to normalize the data in the range [-0.5, 0.5] to be trained by the logistic binary classifier. It is time to introduce a non-sense statistics class.

Basic statistics

We select the computation of mean and standard deviation of the two time series as the first step of the preprocessing phase. The computation of these statistics can be implemented by the reduce methods reduceLeft and foldLeft:

val mean = price.reduceLeft( _ + _ )/price.size
val s2 = price.foldLeft(0.0)((s,x) =>s+(x-mean)*(x-mean))
val stdDev = Math.sqrt(s2/(price.size-1) )

However, this implementation has one major drawback: the dataset (price in this example) has to be traversed for each method (mean, stdDev, min, max, and so on).

One of the solutions is to create a class that computes the counters and the statistics on demand using, once again, the lazy values:

class Stats[T <% Double](private values: DVector[T]) {
   class _Stats(var minValue: Double, var maxValue: Double, var sum: Double, var sumSqr: Double) 
val stats = {
  val _stats = new _Stats(Double.MaxValue, Double.MinValue, 0.0, 0.0)
  values.foreach(x => {
    if(x < _stats.minValue) x else _stats.minValue
    if(x > _stats.maxValue) x else _stats.maxValue 
    _stats.sum + x
    _stats.sumSqr + x*x
  })
  _stats
}
 
lazy val mean = _stats.sum/values.size
lazy val variance = (_stats.sumSqr - mean*mean*values.size)/(values.size-1)
lazy val stdDev = if(variance < ZERO_EPS) ZERO_EPS else Math.sqrt(variance)
lazy val min = _stats.minValue
lazy val max = _stats.mazValue
}

We made the statistics object generic by using the view bounds T <% Double, which assumes a conversion from type T to Double. By defining the statistics as tuple counters (minimum value, maximum value, sum of values, and sum of square values) and folding these values into a statistics object, we limit the number of invocations of the foldLeft reducer method to 1, and therefore, avoid the recomputation of these statistics for the existing dataset each time new data is added.

The code illustrates the use and benefit of lazy values in Scala. The mean is computed only if and when needed.

Normalization and Gauss distribution

Statistics are usually used to normalize data into a probability value [0, 1] as required by most classification or clustering algorithms. It is logical to add the normalization method to the Stats class, as we have already extracted the min and max values:

def normalize: DblVector = {
  val range = max – min;  values.map(x => (x - min)/range)
}

The same approach is used to compute the multivariate normal distribution:

def gauss: DblVector = 
   values.map(x =>{
      val y=x-mean
      INV_SQRT_2PI/stdDev*Math.exp(-0.5*y*y/stdDev)}) 

The price action chart has a very interesting characteristic. At a closer look, a sudden change in price and increase in volume occurs about every three months or so. Experienced investors will undoubtedly recognize that those price-volume patterns are related to the release of quarterly earnings of Cisco. Such regular but unpredictable patterns can be a source of concern or opportunity if risk can be managed. The strong reaction of the stock price to the release of corporate earnings may scare some long-term investors while enticing day traders.

The following graph visualizes the potential correlation between sudden price change (volatility) and heavy trading volume:

Normalization and Gauss distribution

Correlation price-volume action for the Cisco stock

Let's try to correlate the volatility of the stock price with volume. For the sake of this exercise, we define the volatility as the maximum variation of the stock price within each trading session: the relative difference between the highest price during the trading session and the lowest price during the session.

The YahooFinancials enumeration extracts historical stock prices and session volume from a CSV file. For example, the volatility is extracted from the CSV fields of each line in the CSV file as follows:

object YahooFinancials extends Enumeration {
   type YahooFinancials = Value
   val DATE, OPEN, HIGH, LOW, CLOSE, VOLUME, ADJ_CLOSE = Value
   val volatility = (fs: Array[String]) =>fs(HIGH.id).toDouble-fs(LOW.id).toDouble
   …
}

The transform method uses the YahooFinancials enumeration to generate the input data for the model:

def transform(cols: Array[Array[String]]): XYTSeries = {
  val volatility = Stats[Double](cols.map(YahooFinancials.volatility)).normalize
  val volume =  Stats[Double](cols.map(YahooFinancials.volume) ).normalize
  volatility.zip(volume)
}

The volatility and volume data is normalized using the Stats.normalize method defined earlier.

Plotting data

Although charting is not the primary goal of this book, we thought that you will benefit from a brief introduction to JFreeChart. The skeleton code to generate a scatter plot is rather simple. The most relevant code is the transformation of the XYTSeries into graphical JFreeChart's XYSeries:

val xLegend = "Session Volatility"
val yLegend = "Session Volume"
def display(xy: XYTSeries, w: Int, h : Int): Unit  = {
   val series = new XYSeries("CSCO 2012-2013 Stock")
   xy.foreach( x => series.add( x._1,x._2))
     val seriesCollection = new XYSeriesCollection
     seriesCollection.addSeries(series)
    … // plot rendering code
     val chart = ChartFactory.createScatterPlot(xLegend, xLegend, yLegend, seriesCollection, PlotOrientation.VERTICAL, true, false, false)
     createFrame("Logistic Regression", chart)
  }

Note

Visualization

The JFreeChart library is introduced as a robust charting tool. The visualization of the results of a computation is beyond the scope of this book. The code related to plots and charts is omitted from the book in order to keep the code snippets concise and dedicated to machine learning. In a few occasions, output data is formatted as a CSV file to be simply imported into a spreadsheet.

Here is an example of a plot using the ScatterPlot.display method:

val plot = new ScatterPlot(("CSCO 2012-2013", "Session High - Low", "Session Volume"), new BlackPlotTheme)
plot.display(volatility_vol.filter( _._1 < 0.5), 250, 340)
Plotting data

Scatter plot of volatility and volume for the Cisco stock

There is a level of correlation between session volume and session volatility. We can use this information to classify trading sessions by their volatility.

Creating a model (learning)

The objective of the training is to build a model that can discriminate between volatile and nonvolatile trading sessions. For the sake of the exercise, session volatility has been defined as session price high and session price low coupled with heavy trading volume, which constitute the two parameters of the model.

Logistic regression is commonly used in statistics inference. The following implementation of the binary logistic regression classifier exposes a single method, classify, to comply with our desire to reduce the complexity and life cycle of objects. The model parameters, weights, are computed during training when the LogBinRegression class/model is instantiated. As mentioned earlier, the sections of the code nonessential to the understanding of the algorithm are omitted:

class LogBinRegression(val labels: DVector[(XY, Double)], val maxIters: Int, val eta: Double, val eps: Double) {
  val dim = 3
  val weights = train
      
  def classify(xy: XY): Option[(Boolean, Double)] = {
    if(weights != None) {
       val likelihood = sigmoid(w(0) + xy._1*w(1) + xy._2*w(2))
       Some(likelihood > 0.5, likelihood)
    }
    else None
  }

The training method, train, consists of iterating through the computation of the weight using a simple descent gradient. The method computes the weights and returns an option, so the model is either trained and ready for runtime classification or nonexistent (None):

def train: Option[DblVector] = {
  val w = Array.fill(dim)( x=> Random.nextDouble-1.0) 
    
  Range(0, maxIters).find(_ => {
     val deltaW = labels.foldLeft(Array.fill(dim)(0.0))((dw, lbl) => {  
       val y = sigmoid(w(0) + w(1)*lbl._1._1 +  w(2)*lbl._1._2)
       dw.map(dx => dx + (lbl._2 - y)*(lbl._1._1 + lbl._1._2))
    })
    val nextW = Array.fill(dim)(0.0)
                     .zipWithIndex
                     .map(nw => w(nw._2)+eta*deltaW(nw._2))
    val diff = Math.abs(nextW.sum - w.sum)
    nextW.copyToArray(w);  diff < eps
  }) match {
    case Some(iters) => Some(w)
    case None => { … }
  }
}
def sigmoid(x: Double):Double = 1.0/(1.0 + Math.exp(-x))

The iteration is encapsulated in the Scala find method that exists if the algorithm converges (diff < eps). The model parameters, weights, are set to None if the maximum number of iterations is reached.

The training method, train, iterates across the set of observations by computing the gradient between the predicted and observed values. In our simplistic approach, the gradient is computed as a linear function of the sigmoid of the sum of the product of the weight and training observations. As for any optimization problem, the initialization of the solution vector, weights, is critical. We choose to initialize the weight with random values, although in practice, you would use a more deterministic approach to initialize the model parameters.

In order to train the model, we need to label data. The process consists of tagging every trading session as volatile and non volatile according to the observations (relative session volatility and session volume). The labeling process is usually quite cumbersome; therefore, let's generate the label automatically. A trading session is considered volatile if a volatility and volume are both greater than 60 percent of the maximum relative volatility and volume:

val labels = volatilityVol.zip(volatilityVol.map(x =>if( x._1>0.3 && x._2>0.3) 1.0 else 0.0))

Note

Automated labeling

Although quite convenient, automated creation of training labels is not without risk because it may mislabel singular observations. This technique is used in this test for convenience but it is not recommended unless a domain expert reviews the labels manually.

The model is created (trained) by a simple instantiation of the logistic binary classifier:

val logit = new LogBinRegression(labels, 300, 0.00005, 0.02)

The training run is configured with a maximum of 300 iterations, a gradient slope of 0.00005, and convergence criteria of 0.02.

Classify the data

Finally, the model can be tested with a new fresh dataset, not related to the training set:

Date,Open,High,Low,Close,Volume,Adj Close
3/9/2011,14.78,15.08,14.20,14.91,4.79E+08,14.88
11/17/2009,10.78,10.90,10.62,10.84,3901987,10.85

It is just a matter of executing the classification method (exceptions, conditions on method arguments, and returned values are omitted):

val testData = load("resources/data/chap1/CSCO2.csv")
logit.classify(testData(0)) match {
  case Some(topCategory) => Display.show(topCategory)
  case None => { … }
}   
logit.classify(testData(1)) match {
  case Some(topCategory) => Display.show(topCategory)
  case None => { … }
}

The result of the classification is (true,0.516) for the first sample and (false,0.1180) for the second sample.

Note

Validation

The simple classification, in this test case, is provided for illustrating the runtime application of the model. It does not constitute a validation of the model by any stretch of imagination. The next chapter digs into validation metrics and methodology.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset