Designing a workflow

A data scientist has many options in selecting and implementing a classification or clustering algorithm.

Firstly, a mathematical or statistical model is to be selected to extract knowledge from the raw input data or the output of a data upstream transformation. The selection of the model is constrained by the following parameters:

  • Business requirements such as accuracy of results
  • Availability of training data and algorithms
  • Access to a domain or subject-matter expert

Secondly, the engineer has to select a computational and deployment framework suitable for the amount of data to be processed. The computational context is to be defined by the following parameters:

  • Available resources such as machines, CPU, memory, or I/O bandwidth
  • Implementation strategy such as iterative versus recursive computation or caching
  • Requirements for the responsiveness of the overall process such as duration of computation or display of intermediate results

The following diagram illustrates the selection process to define the data transformation for each computation in the workflow:

Designing a workflow

Statistical and computation modeling for machine-learning applications

Tip

Domain expertise, data science, and software engineering

A domain or subject-matter expert is a person with authoritative or credited expertise in a particular area or topic. A chemist is an expert in the domain of chemistry and possibly related fields.

A data scientist solves problems related to data in a variety of fields such as biological sciences, health care, marketing, or finances. Data and text mining, signal processing, statistical analysis, and modeling using machine learning algorithms are some of the activities performed by a data scientist.

A software developer performs all the tasks related to creation of software applications, including analysis, design, coding, testing, and deployment.

The parameters of a data transformation may need to be reconfigured according to the output of the upstream data transformation. Scala's higher-order functions are particularly suitable for implementing configurable data transformations.

The computational framework

The objective is to create a framework flexible and reusable enough to accommodate different workflows and support all types of machine learning algorithms from preprocessing, data smoothing, and classification to validation.

Scala provides us with a rich toolbox that includes monadic design, design patterns, and dependency injections using traits. The following diagram describes the three levels of complexity for creating the framework:

The computational framework

Hierarchical design of a monadic workflow

The first step is to define a trait and a method that describes the transformation of data by a computation unit (element of the workflow).

The pipe operator

Data transformation is the foundation of any workflow for processing and classifying a dataset, training and validating a model, and displaying results.

The objective is to define a symbolic representation of the transformation of different types of data without exposing the internal state of the algorithm implementing the data transformation. The pipe operator is used as the signature of a data transformation:

trait PipeOperator[-T, +U] {
  def |> (data: T): Option[U]
}

Note

F# reference

The notation |> as the signature of the transform or pipe operator is borrowed from the F# language [2:2]. The data transformation indeed implements a function, and therefore, has the same variance signature as Function[-T, +R] of Scala.

The |> operator transforms a data of the type T into a data of the type U and returns an option to handle internal errors and exceptions.

Tip

Advanced Scala idioms

The next two sections introduce a monadic representation of the data transformation and one implementation of the dependency injection to create a dynamic workflow as an alternative to the delimited continuation pattern. Although these topics may interest advanced Scala developers, they are not required to understand any of the techniques or procedures described in this book.

Monadic data transformation

The next step is to create a monadic design to implement the pipe operator. Let's use a monadic design to wrap _fct, a data transformation function (also known as operator), with the most commonly used Scala higher-order methods:

class _FCT[+T](val _fct: T) {
  def map[U](c: T => U): _FCT[U] = new _FCT[U]( c(_fct))
  def flatMap[U](f: T =>_FCT[U]): _FCT[U] = f(_fct)
  def filter(p: T =>Boolean): _FCT[T] = if( p(_fct) ) new _FCT[T](_fct) else zeroFCT(_fct)
  def reduceLeft[U](f: (U,T) => U)(implicit c: T=> U): U = f(c(_fct), _fct)
  def foldLeft[U](zero: U)(f: (U, T) => U)(implicit c: T=> U): U =  f(c(_fct), _fct)
  def foreach(p: T => Unit): Unit = p(_fct)
}

The methods of the _FCT class represent a subset of the traditional Scala higher methods for collections [2:3]. The _FCT class is to be used internally. Arguments are validated by subclasses or containers.

Finally, the Transform class takes a PipeOperator instance as an argument and automatically invokes its operator:

class Transform[-T, +U](val op: PipeOperator[T, U]) extends _FCT[Function[T, Option[U]]](op.|>) {
  def |>(data: T): Option[U] = _fct(data)
}

You may wonder about the reason behind the monadic representation of a data transformation, Transform. You can create any algorithm by just implementing the PipeOperator trait, after all. The reason is that Transform has a richer protocol (methods) and enables developers to create a complex workflow as an alternative to the delimited continuation. The following code snippet illustrates a generic function composition or data transformation composition using the monadic approach:

val op = new PipeOperator[Int, Double] {
def |> (n: Int):Option[Double] =Some(Math.sin(n.toDouble)) 
}
def g(f: Int =>Option[Double]): (Int=> Long) = { 
  (n: Int) => {    
    f(n) match {
        case Some(x) => x.toLong
      case None => -1L
    }
  }
}
val gof = new Transform[Int,Double](op).map(g(_))

This code extends op, an existing transformation, with another function, g. As stated in the Presentation section under Source code in Chapter 1, Getting Started, code related to exceptions, error checking, and validation of arguments is omitted (refer tothe Format of code snippets section in Appendix A, Basic Concepts.

Dependency injection

This section presents the key constructs behind the Cake pattern. A workflow composed of configurable data transformations requires a dynamic modularization (substitution) of the different stages of the workflow. The Cake pattern is an advanced class composition pattern that uses mix-in traits to meet the demands of a configurable computation workflow. It is also known as stackable modification traits [2:4].

This is not an in-depth analysis of the stackable trait injection and self-reference in Scala. There are few interesting articles on dependencies injection that are worth a look [2:5].

Java relies on packages tightly coupled with the directory structure and prefix to modularize the code base. Scala provides developers with a flexible and reusable approach to create and organize modules: traits. Traits can be nested, mixed with classes, stacked, and inherited.

Dependency injection is a fancy name for a reverse look up and binding to dependencies. Let's consider a simple application that requires data preprocessing, classification, and validation. A simple implementation using traits looks like this:

val myApp = new Classification with Validation with PreProcessing { val filter = .. }

If, at a later stage, you need to use an unsupervised clustering algorithm instead of a classifier, then the application has to be rewired:

val myApp = new Clustering with Validation with PreProcessing { val filter = ..  }

This approach results in code duplication and lack of flexibility. Moreover, the filter class member needs to be redefined for each new class in the composition of the application. The problem arises when there is a dependency between traits used in the composition of the application. Let's consider the case for which the filter depends on the validation methodology.

Tip

Mixins linearization [2:6]

The linearization or invocation of methods between mixins follows a right-to-left pattern:

  • Trait B extends A
  • Trait C extends A
  • Class M extends N with C with B

The Scala compiler implements the linearization as follows:

M =>B => C => A => N

Although you can define filter as an abstract value, it still has to be redefined each time a new validation type is introduced. The solution is to use the self type in the definition of the newly composed PreProcessingWithValidation trait:

trait PreProcessiongWithValidation extends PreProcessing {
   self: Validation =>
     val filter = ..
}

The application can then be simply composed as:

val myApp = new Classification with PreProcessingWithValidation {
   val validation: Validation
}

Tip

Overriding val with def

It is advantageous to override the declaration of a value with a definition of a method with the same signature. Contrary to a value that locks the implementation of the value, a method can return a different value for each invocation:

  trait PreProcessor { val validation = … }
  trait MyValidator extends Validator { def validation = … }

In Scala, a value declaration can be overridden by the method definition, not vice versa.

Let's adapt and generalize this pattern to construct a boilerplate template in order to create dynamic computational workflows.

The first step is to generate different modules to encapsulate different types of data transformation.

Workflow modules

The data transformation defined by the PipeOperator instance is dynamically injected into the module by initializing the abstract value. Let's define three parameterized modules representing the preprocessing, processing, and post-processing stages of a workflow:

trait PreprocModule[-T, +U] { val preProc: PipeOperator[T, U] }
trait ProcModule[-T, +U] { val proc: PipeOperator[T, U] }
trait PostprocModule[-T, +U] { val postProc: PipeOperator[T, U] }

The modules (traits) contain only a single abstract value. One characteristic of the Cake pattern is to enforce strict modularity by initializing the abstract values with the type encapsulated in the module, as follows:

trait ProcModule[-T, +U] {
   val proc: PipeOperator [T, U]
   class Classification[-T, +U] extends PipeOperator [T,U] { }
}

One of the objectives in building the framework is allowing developers to create data transformation (inherited from PipeOperator) independently from any workflow. Under these constraints, strict modularity is not an option.

Tip

Scala traits versus Java packages

There is a major difference between Scala and Java in terms of modularity. Java packages constrain developers into following a strict syntax requirement; for instance, the source file has the same name as the class it contains. Scala modules based on stackable traits are far more flexible.

The workflow factory

The next step is to write the different modules into a workflow. This is achieved by using the self reference to the stack of the three traits defined in the previous paragraph. Here is an implementation of the said self reference:

class WorkFlow[T, U, V, W] {
  self: PreprocModule[T,U] with ProcModule[U,V] with PostprocModule[V,W] =>
    def |> (data: T): Option[W] = {
       preProc |> data match {
         case Some(input) => {
          proc |> input match {
            case Some(output) => postProc |> output
            case None => { …  } 
          }
        }
        case None => { … }   
      }
   }
}

Quite simple indeed! If you need only two modules, you can either create a workflow with a stack of two traits or initialize the third with the PipeOperator identity:

def identity[T] = new PipeOperator[T,T] {
   override def |> (data:T): Option[T] = Some(data) 
}

Let's test the wiring with the following simple data transformations:

class Sampler(val samples: Int) extends PipeOperator[Double => Double, DblVector] {
   override def |> (f: Double => Double): Option[DblVector] =
      Some(Array.tabulate(samples)(n => f(n.toDouble/samples)) )
}

class Normalizer extends PipeOperator[DblVector, DblVector] {
   override def |> (data: DblVector): Option[DblVector] = 
     Some(Stats[Double](data).normalize)
}

class Reducer extends PipeOperator[DblVector, Int] {
  override def |> (data: DblVector): Option[Int] = 
      Range(0, data.size) find(data(_) == 1.0)
}

The first operator, Sampler, samples a function, f, with a frequency 1/samples over the interval [0, 1]. The second operator, Normalizer, normalizes the data over the range [0, 1] using the Stats class introduced in the Basic statistics section in Chapter 1, Getting Started. The last operator, Reducer, extracts the index of the large sample (value 1.0) using the Scala collection method, find.

A picture is worth a thousand words; the following UML class diagram illustrates the workflow factory design pattern:

The workflow factory

Finally, the workflow is instantiated by dynamically initializing the abstract values, preProc, proc, and postProc, with a transformation of the type PipeOperator as long as the signature (input and output types) matches the parameterized types defined in each module (lines marked as 1):

val dataflow = new Workflow[Double => Double, DblVector, DblVector, Int] 
             with PreprocModule[Double => Double, DblVector] 
               with ProcModule[DblVector, DblVector] 
                 with PostprocModule[DblVector, Int] {
  
  val preProc: PipeOperator[Double => Double,DblVector] = new Sampler(100) //1
  val proc: PipeOperator[DblVector,DblVector]= new Normalizer //1
  val postProc: PipeOperator[DblVector,Int] = new Reducer//1
}
dataflow |> ((x: Double) => Math.log(x+1.0)+Random.nextDouble) match {
  case Some(index) => …

Scala's strong type checking catches any inconsistent data types at compilation time. It reduces the development cycle because runtime errors are more difficult to track down.

Examples of workflow components

It is difficult to build an example of workflow using classes and algorithms introduced later in the book. The modularization of the preprocessing and clustering stages is briefly described here to illustrate the encapsulation of algorithms described throughout the book within a workflow.

The preprocessing module

The following examples of a workflow module use the time series class, XTSeries, which is used throughout the book:

class XTSeries[T](label: String, arr: Array[T])

The XTSeries class takes an identifier, a label, and an array of parameterized values, arr, as parameters, and is formally described in Chapter 3, Data Preprocessing.

The preprocessing algorithms such as moving average or discrete Fourier filters are encapsulated into a preprocessing module using a combination of abstract value and inheritance:

trait PreprocessingModule[T] {
  val preprocessor: Preprocessing[T]  //1

  abstract class Preprocessing[T] {  //2
     def execute(xt: XTSeries[T]): Unit
  }
  
  abstract class MovingAverage[T] extends Preprocessing[T] with PipeOperator[XTSeries[T], XTSeries[Double]]  { //3
    override def execute(xt: XTSeries[T]): Unit = this |> xt match {
       case Some(filteredData) => …
       case None => …
    }
  }

  class SimpleMovingAverage[@specialized(Double) T <% Double](period: Int)(implicit num: Numeric[T]) extends MovingAverage[T] {
override def |> (xt: XTSeries[T]): Option[XTSeries[Double]] = 
…
  }
class DFTFir[T <% Double](g: Double=>Double) extends Preprocessing[T] extends PreProcessing[T] with PipeOperator[XTSeries[T], XTSeries[Double]]  {
  override def execute(xt: XTSeries[T]): Unit = this |> xt match {
       case Some(filteredData) => …
       case None => …
    }
  override def |> (xt: XTSeries[T]) : Option[XTSeries[Double]]
  }
}

The preprocessing module, PreprocessingModule, defines preprocessor, an abstract value, that is initialized at runtime (line 1). The PreProcessing class is defined as a high-level abstract class with a generic execution function: execute (line 2). The preprocessing algorithms; filtering techniques moving average, MovingAverage; and discrete Fourier, DFTFir in this case, are defined as a class hierarchy with the base type PreProcessing. Each filtering class also implements PipeOperator so it can be weaved into a simpler data transformation workflow (line 3).

The preprocessing algorithms are described in the next chapter.

The clustering module

The encapsulation of clustering techniques is the second example of a module for dependency-injection-based workflow:

trait ClusteringModule[T] { 
  type EMOutput = List[(Double, DblVector, DblVector)]
  val clustering: Clustering[T] 
  
  abstract class Clustering[T] {
    def execute(xt: XTSeries[Array[T]]): Unit 
  }
  
  class KMeans[T <% Double](K: Int, maxIters: Int, distance: (DblVector, Array[T]) => Double)(implicit order: Ordering[T], m: Manifest[T]) extends Clustering[T] with PipeOperator[XTSeries[Array[T]], List[Cluster[T]]] { 
    
    override def |> (xt: XTSeries[Array[T]]): Option[List[Cluster[T]]] 

    override def execute(xt: XTSeries[Array[T]]): Unit = this |> xt match {
      case Some(clusters) => …
      case None => …
    }
  }
  
  class MultivariateEM[T <% Double](K: Int) extends Clustering[T] with PipeOperator[XTSeries[Array[T]], EMOutput] {
  override def |> (xt: XTSeries[Array[T]]): Option[EMOutput] = 
    override def execute(xt: XTSeries[Array[T]]): Unit = this |> xt match {
       case Some(emOutput) => …
       case None => …
    }
  }
}

The ClusteringModule clustering module defines an abstract value, clustering, which is initialized at runtime (line 1). The two clustering algorithms, KMeans and Expectation-Maximization, MultivariateEM, inherits the Clustering base class. The clustering technique can be used in:

  • A dependency-injection-based workflow by overriding execute
  • A simpler data transformation flow by overriding PipeOperator (|>)

The clustering techniques are described in Chapter 4, Unsupervised Learning.

Note

Dependency-injection-based workflow versus data transformation

The data transformation PipeOperator trades flexibility for simplicity. The design proposed for preprocessing and clustering techniques allows you to use both approaches. The techniques presented in the book implement the basic data transformation, PipeOperator, in order to keep the implementation of these techniques as simple as possible.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset