Joining average stats into a graph

The previous example shows us how flexible the aggregateMessages operator is. We can define the type Msg of the messages to be aggregated to fit our needs. Moreover, we can select which nodes receive the messages. Finally, we can also define how we want to merge the messages.

As a final example, let's aggregate many statistics about each team and join this information into the nodes of the graph:

  1. To start, we create its own class for the team stats:
    // Average Stats of All Teams 
    case class TeamStat(
            wins: Int  = 0      // Number of wins
         ,losses: Int  = 0      // Number of losses
            ,ppg: Int  = 0      // Points per game
            ,pcg: Int  = 0      // Points conceded per game
            ,fgp: Double  = 0   // Field goal percentage
            ,tpp: Double  = 0   // Three point percentage
            ,ftp: Double  = 0   // Free Throw percentage
         ){
        override def toString = wins + "-" + losses
    }
  2. We collect the average stats for all teams using aggregateMessages. For that, we define the type of the message to be an 8-element tuple that holds the counter for games played, won, lost, and other statistics that will be stored in TeamStat, as listed previously:
    type Msg = (Int, Int, Int, Int, Int, Double, Double, Double)
    
    val aggrStats: VertexRDD[Msg] = scoreGraph.aggregateMessages(
      // sendMsg
      t => {
              t.sendToSrc((   1,
                              1, 0, 
                              t.attr.winnerStats.score, 
                              t.attr.loserStats.score,
                              t.attr.winnerStats.fgPercent,
                              t.attr.winnerStats.tpPercent,
                              t.attr.winnerStats.ftPercent
                         ))
              t.sendToDst((   1,
                              0, 1, 
                              t.attr.loserStats.score, 
                              t.attr.winnerStats.score,
                              t.attr.loserStats.fgPercent,
                              t.attr.loserStats.tpPercent,
                              t.attr.loserStats.ftPercent
                          ))
           }
      , 
      // mergeMsg
      (x, y) => ( x._1 + y._1, x._2 + y._2, 
                  x._3 + y._3, x._4 + y._4,
                  x._5 + y._5, x._6 + y._6,
                  x._7 + y._7, x._8 + y._8
              )
    )
  3. Given the aggregate message aggrStats, we map them into a collection of TeamStats:
    val teamStats: VertexRDD[TeamStat] = aggrStats mapValues {
      (id: VertexId, m: Msg) => m match {
          case ( count: Int, 
                  wins: Int, 
                  losses: Int,
                  totPts: Int, 
                  totConcPts: Int, 
                  totFG: Double,
                  totTP: Double, 
                  totFT: Double)  => TeamStat( wins, losses,
                                              totPts/count,
                                              totConcPts/count,
                                              totFG/count,
                                              totTP/count,
                                              totFT/count)
        }
    }
  4. Let's join teamStats into the graph. For that, we first create a class Team as a new type for the vertex attribute. Team will have the name and the TeamStat option:
    case class Team(name: String, stats: Option[TeamStat]) {
        override def toString = name + ": " + stats
    }
  5. We use the joinVertices operator, which we have seen in the previous chapter:
    // Joining the average stats to vertex attributes
    def addTeamStat(id: VertexId, t: Team, stats: TeamStat) = Team(t.name, Some(stats))
    
    val statsGraph: Graph[Team, FullResult] = 
        scoreGraph.mapVertices((_, name) => Team(name, None)).
                   joinVertices(teamStats)(addTeamStat)
  6. We can see that the join has worked well by printing the first three vertices in the new graph statsGraph:
    scala> statsGraph.vertices.take(3).foreach(println)
    (1260,Loyola-Chicago: Some(17-13))
    (1410,TX Pan American: Some(7-21))
    (1426,UT Arlington: Some(15-15))
    
  7. To conclude this task, let's find out the top 10 teams in the regular seasons. To do so, we define an Ordering option for Option[TeamStat] as follows:
    import scala.math.Ordering 
    object winsOrdering extends Ordering[Option[TeamStat]] {
        def compare(x: Option[TeamStat], y: Option[TeamStat]) = (x, y) match {
            case (None, None)       => 0 
            case (Some(a), None)    => 1
            case (None, Some(b))    => -1
            case (Some(a), Some(b)) => if (a.wins == b.wins) a.losses compare b.losses
            else a.wins compare b.wins
        }
    }
  8. Finally:
    import scala.reflect.classTag
    import scala.reflect.ClassTag
    scala> statsGraph.vertices.sortBy(v => v._2.stats,false)(winsOrdering, classTag[Option[TeamStat]]).
         |                             take(10).foreach(println)
    (1246,Kentucky: Some(34-0))
    (1437,Villanova: Some(32-2))
    (1112,Arizona: Some(31-3))
    (1458,Wisconsin: Some(31-3))
    (1211,Gonzaga: Some(31-2))
    (1320,Northern Iowa: Some(30-3))
    (1323,Notre Dame: Some(29-5))
    (1181,Duke: Some(29-4))
    (1438,Virginia: Some(29-3))
    (1268,Maryland: Some(27-6))

    Note

    Note that the ClassTag parameter is required in sortBy to make use of Scala's reflection. That is why we had the previous imports.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset