The aggregateMessages operator

Once we have our graph ready, let's start our mission, which is aggregating the stats data in scoreGraph. In GraphX, aggregateMessages is the operator for that kind of job.

For example, let's find out the average field goals made per game by the winning teams. In other words, the games that the teams lost will not be counted. To get the average for each team, we first need to have the number of games won by the team and the total field goals that the team made in those games:

// Aggregate the total field goals made by winning teams
type FGMsg = (Int, Int)
val winningFieldGoalMade: VertexRDD[FGMsg] = scoreGraph aggregateMessages(
    // sendMsg
    triplet => triplet.sendToSrc(1, triplet.attr.winnerStats.fieldGoalMade)
    // mergeMsg
    ,(x, y) => (x._1 + y._1, x._2+ y._2)
)
// Aggregate the total field goals made by winning teams
type Msg = (Int, Int)
type Context = EdgeContext[String, FullResult, Msg] 
val winningFieldGoalMade: VertexRDD[Msg] = scoreGraph aggregateMessages(
    // sendMsg
    (ec: Context) => ec.sendToSrc(1, ec.attr.winnerStats.fieldGoalMade),
    
    // mergeMsg
    (x: Msg, y: Msg) => (x._1 + y._1, x._2+ y._2)
)

EdgeContext

There is a lot going on in the previous call to aggregateMessages. So, let's see it working in slow motion. When we called aggregateMessages on the scoreGraph method, we had to pass two functions as arguments.

The first function has a signature EdgeContext[VD, ED, Msg] => Unit. It takes an EdgeContext parameter as input. It does not return anything but it can produce side effects, such as sending a message to a node.

Ok, but what is that EdgeContext type? Similar to EdgeTriplet, EdgeContext represents an edge along with its neighboring nodes. It can access both the edge attribute, and the source and destination nodes' attributes. In addition, EdgeContext has two methods to send messages along the edge to its source node or to its destination node. These methods are sendToSrc and sendToDst respectively. Then, the type of message that we want each triplet in the graph to send is defined by Msg. Similar to VD and ED, we can define the concrete type that Msg takes.

In our example, we need to aggregate the number of games played and the number of field goals made. Therefore, we define Msg as a pair of Int. Furthermore, each edge context sends a message to only its source node, that is the winning team, because we are interested in the total field goals made by the teams for only the games that they won. The actual message sent to each winner node is a pair of integers (1, ec.attr.winnerStats.fieldGoalMade). The first integer serves as a counter for the games won by the source node, whereas the second one corresponds to the number of field goals made by the winner. This latter integer is then extracted from the edge attribute.

In addition to sendMsg, the second function that we need to pass to aggregateMessages is a mergeMsg function with the signature (Msg, Msg) => Msg. As its name implies, mergeMsg is used to merge two messages received at each node into a new one. Its output type must be the same, for example Msg. Using these two functions, aggregateMessages returns the aggregated messages inside VertexRDD[Msg].

Returning to our example, we set out to compute the average field goals per winning game for all teams. To get this final result, we simply apply mapValues to the output of aggregateMessages, as follows:

// Average field goals made per Game by winning teams
val avgWinningFieldGoalMade: VertexRDD[Double] = 
    winningFieldGoalMade mapValues (
        (id: VertexId, x: Msg) => x match {
            case (count: Int, total: Int) => total.toDouble/count
})

Let's check the output:

scala> avgWinningFieldGoalMade.take(5).foreach(println)
(1260,24.71641791044776)
(1410,23.56578947368421)
(1426,26.239436619718308)
(1166,26.137614678899084)
(1434,25.34285714285714)

The definitions of aggregateMessages and EdgeContext, as we explained previously, are shown as follows:

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}



abstract class EdgeContext[VD, ED, A] {
    
    // Attribute associated with the edge:
    abstract def attr: ED
    
    // Vertex attribute of the edge's source vertex.    
    abstract def srcAttr: VD

    // Vertex attribute of the edge's destination vertex.    
    abstract def dstAttr: VD

    // Vertex id of the edge's source vertex.
    abstract def srcId: VertexId
    
    // Vertex id of the edge's destination vertex.
    abstract def dstId: VertexId

    // Sends a message to the destination vertex.
    abstract def sendToDst(msg: A): Unit

    // Sends a message to the source vertex.
    abstract def sendToSrc(msg: A): Unit    
}

Abstracting out the aggregation

That was kinda cool! We can do the same to average the points per game scored by winning teams:

// Aggregate the points scored by winning teams
val winnerTotalPoints: VertexRDD[(Int, Int)] = scoreGraph.aggregateMessages(
    // sendMsg
    triplet => triplet.sendToSrc(1, triplet.attr.winnerStats.score), 
    // mergeMsg
    (x, y) => (x._1 + y._1, x._2+ y._2)
)

// Average field goals made per Game by winning teams 
var winnersPPG: VertexRDD[Double] = 
            winnerTotalPoints mapValues (
                (id: VertexId, x: (Int, Int)) => x match {
                    case (count: Int, total: Int) => total.toDouble/count
                })

Let's check the output:

scala> winnersPPG.take(5).foreach(println)
(1260,71.19402985074628)
(1410,71.11842105263158)
(1426,76.30281690140845)
(1166,76.89449541284404)
(1434,74.28571428571429)

Now, the coach wants us to list the top five teams with the highest average three-pointer made per winning game. By the way, he also wants to know which teams are the most efficient in three-pointers.

Keeping things DRY

We can copy and modify the previous code but that would be repetitive. Instead, let's abstract out the average aggregation operator so that it can work on any statistics that the coach needs. Luckily, Scala's higher-order functions are there to help in this task.

For each statistic that our coach wants, let's define a function that takes a team's GameStats as input and returns the statistic that we are interested in. For now, we will need the number of three-pointers made and the average three-pointer percentage:

        // Getting individual stats
        def threePointMade(stats: GameStats) = stats.threePointerMade
        def threePointPercent(stats: GameStats) = stats.tpPercent

Then, we create a generic function that takes as inputs a stats graph and one of the functions defined previously, which has a signature GameStats => Double:

// Generic function for stats averaging
def averageWinnerStat(graph: Graph[String, FullResult])(getStat: GameStats => Double): VertexRDD[Double] = {
    type Msg = (Int, Double)
    val winningScore: VertexRDD[Msg] = graph.aggregateMessages[Msg](
        // sendMsg
        triplet => triplet.sendToSrc(1, getStat(triplet.attr.winnerStats)), 
        // mergeMsg
        (x, y) => (x._1 + y._1, x._2+ y._2)
    )
    winningScore mapValues (
        (id: VertexId, x: Msg) => x match {
            case (count: Int, total: Double) => total/count
        })
}

Then, we can use the average stats by passing the functions threePointMade and threePointPercent to averageWinnerStat:

val winnersThreePointMade = 
averageWinnerStat(scoreGraph)(threePointMade) 
val winnersThreePointPercent = 
averageWinnerStat(scoreGraph)(threePointPercent) 

With little effort, we can tell the coach which five winning teams scored the highest number of threes per game:

scala> winnersThreePointMade.sortBy(_._2,false).take(5).foreach(println)
(1440,11.274336283185841)
(1125,9.521929824561404)
(1407,9.008849557522124)
(1172,8.967441860465117)
(1248,8.915384615384616)

While we are at it, let's find out the five most efficient teams in three-pointers:

scala> winnersThreePointPercent.sortBy(_._2,false).take(5).foreach(println)
(1101,46.90555728464225)
(1147,44.224282479431224)
(1294,43.754532434101534)
(1339,43.52308905887638)
(1176,43.080814169045105)

Interestingly, the teams that made the most three-pointers per winning game are not always the ones who are the most efficient at it. But, they still won those games, which is more important.

Coach wants more numbers

The coach seems unsatisfied with that argument and wants us to get the same statistics but wants us to average them over all the games that each team has played.

Thus, we have to aggregate the information from all the nodes of our graph, and not only at the destination nodes. To make our previous abstraction more flexible, let's create the following types:

trait Teams
case class Winners extends Teams 
case class Losers extends Teams
case class AllTeams extends Teams

We modify the previous higher-order function to have an extra argument Teams, which will help us specify at which nodes we want to collect and aggregate the required game stats. The new function becomes:

def averageStat(graph: Graph[String, FullResult])(getStat: GameStats => Double, tms: Teams): VertexRDD[Double] = {
    type Msg = (Int, Double)
    val aggrStats: VertexRDD[Msg] = graph.aggregateMessages[Msg](
        // sendMsg
        tms match {
            case _ : Winners => t => t.sendToSrc((1, getStat(t.attr.winnerStats)))
            case _ : Losers  => t => t.sendToDst((1, getStat(t.attr.loserStats)))
            case _       => t => {
                t.sendToSrc((1, getStat(t.attr.winnerStats)))
                t.sendToDst((1, getStat(t.attr.loserStats)))
            }
        }
        , 
        // mergeMsg
        (x, y) => (x._1 + y._1, x._2+ y._2)
    )

    aggrStats mapValues (
        (id: VertexId, x: Msg) => x match {
            case (count: Int, total: Double) => total/count
            })
    }

Compared to averageWinnerStat, aggregateStat allows us to choose whether we want to aggregate the stats for winners only, for losers only, or for all teams. Since the coach wants the overall stats averaged over all games played, we aggregate the stats by passing the AllTeams() flag in aggregateStat. In this case, we simply define the sendMsg argument in aggregateMessages so that the required stats are sent to both the source (the winner) and to the destination (the loser) using the EdgeContext class's sendToSrc and sendToDst functions respectively. This mechanism is pretty straightforward. We just need to make sure we send the right information to the right node. In this case, we send winnerStats to the winner and loserStats to the loser.

Ok, you've got the idea now. So, let's apply it to please our coach. Here are the teams with the overall highest three-pointers per page:

// Average Three Point Made Per Game for All Teams 
val allThreePointMade = averageStat(scoreGraph)(threePointMade, AllTeams())   

Let's see the output:

scala> allThreePointMade.sortBy(_._2, false).take(5).foreach(println)
(1440,10.180811808118081)
(1125,9.098412698412698)
(1172,8.575657894736842)
(1184,8.428571428571429)
(1407,8.411149825783973) 

Here are the five most efficient teams overall in three-pointers per game:

// Average Three Point Percent for All Teams
val allThreePointPercent = averageStat(scoreGraph)(threePointPercent, AllTeams())

The output is:

scala> allThreePointPercent.sortBy(_._2,false).take(5).foreach(println)
(1429,38.8351815824302)
(1323,38.522819895594)
(1181,38.43052051444854)
(1294,38.41227053353959)
(1101,38.097896464168954)

Actually, there is only a 2 percent difference between the most efficient team and the one in the fiftieth position. Most NCAA teams are therefore pretty efficient behind the line. I bet the coach knew that already!

Calculating average points per game

We can also reuse the averageStat function to get the average points per game for the winners. In particular, let's take a look at the two teams that won games with the highest and lowest scores:

// Winning teams
val winnerAvgPPG = averageStat(scoreGraph)(score, Winners())

Let's check the output:

scala> winnerAvgPPG.max()(Ordering.by(_._2))
res36: (org.apache.spark.graphx.VertexId, Double) = (1322,90.73333333333333)

scala> winnerAvgPPG.min()(Ordering.by(_._2))
res39: (org.apache.spark.graphx.VertexId, Double) = (1197,60.5)

Apparently, the most defensive team can win games by scoring only 60 points, whereas the most offensive team can score an average of 90 points.

Next, let's average the points per game for all games played and look at the two teams with the best and worst offense during the 2015 season:

// Average Points Per Game of All Teams
val allAvgPPG = averageStat(scoreGraph)(score, AllTeams())

The output is:

scala> allAvgPPG.max()(Ordering.by(_._2))
res42: (org.apache.spark.graphx.VertexId, Double) = (1322,83.81481481481481)

scala> allAvgPPG.min()(Ordering.by(_._2))
res43: (org.apache.spark.graphx.VertexId, Double) = (1212,51.111111111111114)

To no surprise, the best offensive team is the same as the one who scored most in winning games. To win a game, 50 points is not enough of an average for a team.

Defense stats – D matters as in direction

Previously, we obtained some statistics such as field goals or the three-point percentages that a team achieves. What if instead we want to aggregate the average points or rebounds that each team concedes to their opponents? To compute that, we define a new higher-order function averageConcededStat. Compared to averageStat, this function needs to send loserStats to the winning team and winnerStats to the losing team. To make things more interesting, we are going to make the team name part of the message Msg:

def averageConcededStat(graph: Graph[String, FullResult])(getStat: GameStats => Double, rxs: Teams): VertexRDD[(String, Double)] = {
    type Msg = (Int, Double, String)
    val aggrStats: VertexRDD[Msg] = graph.aggregateMessages[Msg](
        // sendMsg
        rxs match {
            case _ : Winners => t => t.sendToSrc((1, getStat(t.attr.loserStats), t.srcAttr))
            case _ : Losers  => t => t.sendToDst((1, getStat(t.attr.winnerStats), t.dstAttr))
            case _       => t => {
                t.sendToSrc((1, getStat(t.attr.loserStats),t.srcAttr))
                t.sendToDst((1, getStat(t.attr.winnerStats),t.dstAttr))
            }
        }
        , 
        // mergeMsg
        (x, y) => (x._1 + y._1, x._2+ y._2, x._3)
    )

    aggrStats mapValues (
        (id: VertexId, x: Msg) => x match {
            case (count: Int, total: Double, name: String) => (name, total/count)
        })
}

With that, we can calculate the average points conceded by the winning and losing teams as follows:

val winnersAvgConcededPoints = averageConcededStat(scoreGraph)(score, Winners())
val losersAvgConcededPoints = averageConcededStat(scoreGraph)(score, Losers())

Let's check the output:

scala> losersAvgConcededPoints.min()(Ordering.by(_._2))
res: (VertexId, (String, Double)) = (1101,(Abilene Chr,74.04761904761905))

scala> winnersAvgConcededPoints.min()(Ordering.by(_._2))
res: (org.apache.spark.graphx.VertexId, (String, Double)) = (1101,(Abilene Chr,74.04761904761905))

scala> losersAvgConcededPoints.max()(Ordering.by(_._2))
res: (VertexId, (String, Double)) = (1464,(Youngstown St,78.85714285714286))

scala> winnersAvgConcededPoints.max()(Ordering.by(_._2))
res: (VertexId, (String, Double)) = (1464,(Youngstown St,71.125))

The previous code tells us that Abilene Christian University is the most defensive team. They concede the least points whether they win a game or not. On the other hand, Youngstown has the worst defense.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset