Performance optimization

In addition to the sendMsg and mergeMsg methods, aggregateMessages can also take an optional argument TripletFields, which indicates what data is accessed in EdgeContext. The main reason for explicitly specifying such information is to help optimize the performance of the aggregateMessages operation.

In fact, TripletFields represents a subset of the fields of _EdgeTriplet_ and it enables GraphX to populate only those fields that are necessary.

The default value is TripletFields.All, which means that the sendMsg function may access any of the fields in the EdgeContext class. Otherwise, the TripletFields argument is used to tell GraphX that only part of EdgeContext will be required so that an efficient join strategy can be used. All possible options for the TripletFields are listed as follows:

  • TripletFields.All: This option exposes all the fields (source, edge, and destination)
  • TripletFields.Dst: This one exposes the destination and edge fields but not the source field
  • TripletFields.EdgeOnly: This option exposes only the edge field but not the source or destination field
  • TripletFields.None: With this option none of the triplet fields are exposed
  • TripletFields.Src: This one exposes the source and edge fields but not the destination field

Using our previous example, if we are interested in computing the total number of wins and losses for each team, we will not need to access any fields of the EdgeContext class. In this case, we should use TripletFields.None to indicate so:

// Number of wins of the teams
val numWins: VertexRDD[Int] = scoreGraph.aggregateMessages(
    triplet => {
        triplet.sendToSrc(1)    // No attribute is passed but an integer
    },
    (x, y) => x + y,
    TripletFields.None
)

// Number of losses of the teams
val numLosses: VertexRDD[Int] = scoreGraph.aggregateMessages(
    triplet => {
        triplet.sendToDst(1)    // No attribute is passed but an integer
    },
    (x, y) => x + y,
    TripletFields.None
)

To see that this works, let's print the top five and bottom five teams:

scala> numWins.sortBy(_._2,false).take(5).foreach(println)
(1246,34)
(1437,32)
(1112,31)
(1458,31)
(1211,31)

scala> numLosses.sortBy(_._2, false).take(5).foreach(println)
(1363,28)
(1146,27)
(1212,27)
(1197,27)
(1263,27)

Should you want the name of the top five teams, you need to access the srcAttr attribute. In this case, we need to set tripletFields to TripletFields.Src.

Kentucky as the undefeated team in the regular season:

val numWinsOfTeams: VertexRDD[(String, Int)] = scoreGraph.aggregateMessages(
    t => {
        t.sendToSrc(t.srcAttr, 1)         // Pass source attribute only
    },
    (x, y) => (x._1, x._2 + y._2),
    TripletFields.Src
)

Et voila!:

scala> numWinsOfTeams.sortBy(_._2._2, false).take(5).foreach(println)
(1246,(Kentucky,34))
(1437,(Villanova,32))
(1112,(Arizona,31))
(1458,(Wisconsin,31))
(1211,(Gonzaga,31))

scala> numWinsOfTeams.sortBy(_._2._2).take(5).foreach(println)
(1146,(Cent Arkansas,2))
(1197,(Florida A&M,2))
(1398,(Tennessee St,3))
(1263,(Maine,3))
(1420,(UMBC,4))

Kentucky has not lost any of its 34 games during the regular season. Too bad that they could not make it into the championship final.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset