Chapter 5. Creating Custom Graph Aggregation Operators

In the previous chapter, we have seen various operations for transforming the elements of a graph and for modifying its structure. Here, we will learn to use a generic and powerful operator named aggregateMessages that is useful for aggregating the neighborhood information of all nodes in the graph. In fact, many graph-processing algorithms rely on iteratively accessing the properties of neighboring nodes and adjacent edges. One such example is the PageRank algorithm.

By applying aggregateMessages to the NCAA College Basketball datasets, you will be able to:

  • Understand the basic mechanisms and patterns of aggregateMessages
  • Apply it to create custom graph aggregation operations
  • Optimize the performance and efficiency of aggregateMessages

NCAA College Basketball datasets

We will again learn by doing in this chapter. This time, we will take the NCAA College Basketball as an illustrative example. Specifically, we use two CSV datasets. The first one teams.csv contains the list of all college teams that played in the NCAA Division I competition. Each team is associated with a four-digit ID number. The second dataset stats.csv contains the score and statistics of every game during the 2014-2015 regular season. Using the techniques learned in Chapter 2, Building and Exploring Graphs, let's parse and load these datasets and load them into RDDs:

  1. We create a class GameStats that records the statistics of one team during a specific basketball game:
    case class GameStats(
        val score: Int,
        val fieldGoalMade:   Int,
        val fieldGoalAttempt: Int, 
        val threePointerMade: Int,
        val threePointerAttempt: Int,
        val threeThrowsMade: Int,
        val threeThrowsAttempt: Int, 
        val offensiveRebound: Int,
        val defensiveRebound: Int,
        val assist: Int,
        val turnOver: Int,
        val steal: Int,
        val block: Int,
        val personalFoul: Int
    )
  2. We also add the following methods to GameStats in order to know how efficient a team's offense was during a game:
    // Field Goal percentage
    def fgPercent: Double = 100.0 * fieldGoalMade / fieldGoalAttempt
    
    // Three Point percentage
    def tpPercent: Double = 100.0 * threePointerMade / threePointerAttempt
    
    // Free throws percentage
    def ftPercent: Double = 100.0 * threeThrowsMade / threeThrowsAttempt
    override def toString: String = "Score: " + score 
  3. We now create a couple of classes for the games' result:
    abstract class GameResult(
        val season:     Int, 
        val day:        Int,
        val loc:        String
    )
    
    case class FullResult(
        override val season:    Int, 
        override val day:       Int,
        override val loc:       String, 
        val winnerStats:        GameStats,
        val loserStats:         GameStats 
    ) extends GameResult(season, day, loc)

    FullResult has the year and day of the season, the location where the game was played, and the game statistics of both the winning and losing teams.

  4. We will then create a statistics graph of the regular seasons. In this graph, the nodes are the teams, whereas each edge corresponds to a specific game. To create the graph, let's parse the CSV file teams.csv into the RDD teams:
    val teams: RDD[(VertexId, String)] =
        sc.textFile("./data/teams.csv").
        filter(! _.startsWith("#")).
        map {line =>
            val row = line split ','
            (row(0).toInt, row(1))
        }
  5. We can check the first few teams in this new RDD:
    scala> teams.take(3).foreach{println}
    (1101,Abilene Chr)
    (1102,Air Force)
    (1103,Akron)
    
  6. We do the same thing to obtain an RDD of the game results, which will have a type RDD[Edge[FullResult]]. We just parse stats.csv and record the fields that we need— the ID of the winning team, the ID of the losing team, and the game statistics of both teams:
    val detailedStats: RDD[Edge[FullResult]] =
      sc.textFile("./data/stats.csv").
      filter(! _.startsWith("#")).
      map {line =>
          val row = line split ','
          Edge(row(2).toInt, row(4).toInt, 
              FullResult(
                  row(0).toInt, row(1).toInt, 
                  row(6),
                  GameStats(      
                                  score = row(3).toInt,
                          fieldGoalMade = row(8).toInt,
                       fieldGoalAttempt = row(9).toInt, 
                       threePointerMade = row(10).toInt,
                    threePointerAttempt = row(11).toInt,   
                        threeThrowsMade = row(12).toInt,
                     threeThrowsAttempt = row(13).toInt, 
                       offensiveRebound = row(14).toInt,
                       defensiveRebound = row(15).toInt,
                                 assist = row(16).toInt,
                               turnOver = row(17).toInt,
                                  steal = row(18).toInt,
                                  block = row(19).toInt,
                           personalFoul = row(20).toInt
                  ),
                  GameStats(
                                  score = row(5).toInt,
                          fieldGoalMade = row(21).toInt,
                       fieldGoalAttempt = row(22).toInt, 
                       threePointerMade = row(23).toInt,
                    threePointerAttempt = row(24).toInt,
                        threeThrowsMade = row(25).toInt,
                     threeThrowsAttempt = row(26).toInt, 
                       offensiveRebound = row(27).toInt,
                       defensiveRebound = row(28).toInt,
                                 assist = row(20).toInt,
                               turnOver = row(30).toInt,
                                  steal = row(31).toInt,
                                  block = row(32).toInt,
                           personalFoul = row(33).toInt
                  )
              )
          )
      }

    Let's check what we have got:

    scala> detailedStats.take(3).foreach(println)
    Edge(1165,1384,FullResult(2006,8,N,Score: 75-54))
    Edge(1393,1126,FullResult(2006,8,H,Score: 68-37))
    Edge(1107,1324,FullResult(2006,9,N,Score: 90-73))
    
  7. We then create our graph of stats:
    scala> val scoreGraph = Graph(teams, detailedStats)
    

For curiosity, let's see which team has won against the 2015 NCAA champions Duke in the regular season. To do that, we filter the graph triplets whose destination attribute is Duke. This is because when we created our stats graph, each edge is directed from the winner node to the loser node. So, Duke has lost only four games in the regular season:

scala> scoreGraph.triplets.filter(_.dstAttr == "Duke").foreach(println)
((1274,Miami FL),(1181,Duke),FullResult(2015,71,A,Score: 90-74))
((1301,NC State),(1181,Duke),FullResult(2015,69,H,Score: 87-75))
((1323,Notre Dame),(1181,Duke),FullResult(2015,86,H,Score: 77-73))
((1323,Notre Dame),(1181,Duke),FullResult(2015,130,N,Score: 74-64))
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset