Data operations on VertexRDD and EdgeRDD

All of the operations we've seen previously are graph operations. They are invoked on a graph and they return a new graph object. In this section, we will introduce operations that transform VertexRDD and EdgeRDD collections. The types of these collections are subtypes of RDD[(VertexID, VD)] and RDD[Edge[ED]] respectively.

Mapping VertexRDD and EdgeRDD

First, mapValues takes a map function as input, which transforms each vertex attribute in the VertexRDD. Then, it returns a new VertexRDD object while preserving the original vertex indices. The method mapValues is overloaded so that the map function can take an input with a type VD or (VertexId, VD). The type of the new vertex attributes can be different to VD:

def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]

For illustration, let's take the biographies of the previous Hollywood stars in a VertexRDD collection:

scala> val actorsBio = movieJoinedGraph.vertices
actorsBio: VertexRDD[String] 

scala> actorsBio.foreach(println)
(4,Matt Damon:Boston, Massachusetts, USA)
(1,George Clooney)
(5,Salma Hayek:Coatzacoalcos, Veracruz, Mexico)
(3,Will Smith:Philadelphia, Pennsylvania, USA)
(2,Julia Stiles:New York City, New York, USA)

Now, we can use mapValues to extract their names into a new VertexRDD collection:

scala> actorsBio.mapValues(s => s.split(':')(0)).foreach(println)
(2,Julia Stiles)
(1,George Clooney)
(5,Salma Hayek)
(4,Matt Damon)
(3,Will Smith)

Using the overloaded mapValues method, we can include the vertex IDs in the input of the map function and still get a similar result:

scala> actorsBio.mapValues((vid,s) => s.split(':')(0)).foreach(println)
(1,George Clooney)
(5,Salma Hayek)
(3,Will Smith)
(4,Matt Damon)
(2,Julia Stiles)

There is also one mapValues method for transforming EdgeRDDs:

def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]

Similarly, mapValues changes only the edge attributes. It does not remove or add edges, nor does it modify the direction of the edges.

Filtering VertexRDDs

Using the filter method, we can also filter VertexRDD collections. While not changing the vertex indexing, filter removes the vertices that do not satisfy a user-defined predicate, which is passed to filter. Contrary to mapValues, filter is not overloaded so the type of the predicate must be (VertexId, VD) => Boolean. This is summarized as follows:

def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]

In addition to filter, the diff operation also filters vertices inside a VertexRDD collection. It takes another VertexRDD set as input and removes vertices from the original set that are also in the input set:

def diff(other: VertexRDD[VD]): VertexRDD[VD]

Note

GraphX does not provide a similar filter operation for EdgeRDD collections because filtering edges can be directly and efficiently achieved using the graph operation subgraph. See the previous section on Modifying graph structures.

Joining VertexRDDs

The following join operators are optimized for VertexRDD collections:

def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

def leftJoin[U, VD2](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[U]) => VD2): VertexRDD[VD2]

The first operator is innerJoin, which takes VertexRDD and a user-defined function f as inputs. Using this function, it joins the attributes of vertices that are present in both the original and input VertexRDD sets. In other words, innerJoin returns the intersection set of vertices and merges their attributes according to f.

So, given the vertex RDD from movieGraph, the result of innerJoin with the RDD of biographies will not contain George Clooney, Paul Walker or José Antonio Domínguez Banderas:

scala> val actors = movieGraph.vertices
actors: VertexRDD[String]

scala> actors.innerJoin(bio)((vid, name, b) => name + " is from " + b.hometown).foreach(println)
(4,Matt Damon is from Boston, Massachusetts, USA)
(5,Salma Hayek is from Coatzacoalcos, Veracruz, Mexico)
(2,Julia Stiles is from New York City, New York, USA)
(3,Will Smith is from Philadelphia, Pennsylvania, USA)

The second operator leftJoin is similar to the operator outerJoinVertices defined in Graph[VD,ED]. It also takes a user-defined function f of type (VertexId, VD, Option[U]) => VD2) in addition to an input VertexRDD set. The resulting VertexRDD will also contain the same vertices as the original VertexRDD. Since the third input of the function f is Option[U], it should handle the case when a vertex in the original VertexRDD set is not present in the input RDD. Using the previous example, we would do something like:

scala> actors.leftJoin(bio)((vid, name, b) => b match {
    case Some(bio) => name + " is from " + bio.hometown
    case None => name + "'s hometown is unknown"
}).foreach(println)

(4,Matt Damon is from Boston, Massachusetts, USA)
(1,George Clooney's hometown is unknown)
(5,Salma Hayek is from Coatzacoalcos, Veracruz, Mexico)
(2,Julia Stiles is from New York City, New York, USA)
(3,Will Smith is from Philadelphia, Pennsylvania, USA)

Joining EdgeRDDs

In GraphX, there exists a join operator innerJoin for joining two EdgeRDD:

def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

It is similar to the innerJoin method for VertexRDD, except that now its input function has the type: f: (VertexId, VertexId, ED, ED2) => ED3. Moreover, innerJoin uses the same partitioning strategy as the original EdgeRDD.

Reversing edge directions

Previously, we have seen the reverse operation that reverses all the edges of graph. When we want to reverse only a subset of edges in a graph, the following reverse method defined as EdgeRDD objects becomes useful:

def reverse: EdgeRDD[ED]

For instance, we know that graph properties must be directed in Spark. The only way to model a non-directed graph is to add a reverse link for each edge. This can easily be done using the reverse operator as follows. First, we extract the edges of the movie graph into the EdgeRDD movie:

scala> val movies = movieGraph.edges
movies: EdgeRDD[String,String] 

scala> movies.foreach(println)
Edge(1,4,Ocean's Eleven)
Edge(3,5,Wild Wild West)
Edge(2,4,Bourne Ultimatum)
Edge(1,5,From Dusk Till Dawn)
Edge(3,4,The Legend of Bagger Vance)

Then, we create a new EdgeRDD collection with the links reversed. Then, we obtain the bidirected graph using the union of these two EdgeRDD collections:

scala> val bidirectedGraph = Graph(actors, movies union 
       movies.reverse)

We can see that this works by printing the new set of edges:

scala> bidirectedGraph.edges.foreach(println)
Edge(1,5,From Dusk Till Dawn)
Edge(3,4,The Legend of Bagger Vance)
Edge(3,5,Wild Wild West)
Edge(1,4,Ocean's Eleven)
Edge(2,4,Bourne Ultimatum)
Edge(4,1,Ocean's Eleven)
Edge(4,2,Bourne Ultimatum)
Edge(5,3,Wild Wild West)
Edge(4,3,The Legend of Bagger Vance)
Edge(5,1,From Dusk Till Dawn)

Note

EdgeRDD[ED] is a subtype of RDD[Edge[ED]] and it organizes the edges in to blocks partitioned using one of the partitioning strategies defined in PartitionStrategy. The edge attributes and adjacency structure are stored separately within each partition so that the structure can be reused when only the edge attributes are changed.

In Spark 1.0 and 1.1, the type signature of EdgeRDD has been changed EdgeRDD[ED, VD] for optimization purposes. Since Spark 1.2, the signature has switched back to the simpler EdgeRDD[ED] type definition while implementing the caching optimization in a different way.

Collecting neighboring information

When doing graph computations, we may want to use neighboring information, such as the attributes of neighboring vertices. The two operators, collectNeighborIds and collectNeighbors explicitly allow us to do that. collectNeighborIds collects into a VertexRDD only the vertex IDs of each node's neighbors, whereas collectNeighbors also collects their attributes:

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

These two methods are invoked on a property graph and are passed with EdgeDirection as an input. An EdgeDirection attribute can take four possible values:

  • Edge.Direction.In: When this option is specified, each vertex collects only the attributes of neighbors that have an incoming link to it
  • Edge.Direction.Out: Each vertex collects only the attributes of neighbors that it links to
  • Edge.Direction.Either: Each vertex collects the attributes of all its neighbors
  • Edge.Direction.Both: Each vertex collects the attributes of the neighbors with which it has both an incoming edge and outgoing one

Tip

For optimal performance, it is best to avoid using these two operators and rewrite the computation using the more generic and efficient aggregateMessages operator presented in the next chapter. The efficiency gain can be substantial especially when implementing an iterative graph-parallel algorithm. But for simple graph transformations that are done only once, it is ok to use collectNeighors and collectNeighborIds.

Example – from food network to flavor pairing

In Chapter 2, Building and Exploring Graphs, we presented the food ingredient dataset and built a bipartite graph that connects each food ingredient to its compounds. In the following, we will build another graph, which consists of only food ingredients. A pair of food ingredients is connected in the new graph only if they share at least one compound. We'll call this new graph the flavor network. We can later use this graph to create new recipes by experimenting with new food pairings.

Let's start with the bipartite food network that we built in Chapter 2, Building and Exploring Graphs:

scala> val nodes = ingredients ++ compounds
scala> val foodNetwork = Graph(nodes, links)
foodNetwork: Graph[Node,Int]

To create the new flavor network, we need to know which ingredients share some compounds. This can be done by first collecting the ingredient IDs for each compound node in the foodNetwork graph. Concretely, we collect and group ingredient IDs that have that same compound into an RDD collection of tuples (compound id, Array[ingredient id]), as follows:

scala> val similarIngr: RDD[(VertexId, Array[VertexId])] = 
foodNetwork.collectNeighborIds(EdgeDirection.In)
similarIngr: RDD[(VertexId, Array[VertexId])]

Next, we create a function pairIngredients that takes one such tuple of (compound id, Array[ingredient id]) and creates an edge between every pair of ingredients in the array:

def pairIngredients(ingPerComp: (VertexId, Array[VertexId])): Seq[Edge[Int]] =
    for {
        x <- ingPerComp._2
        y <- ingPerComp._2
        if x != y
  }   yield Edge(x,y,1)
pairIngredients:   
(ingPerComp:(VertexId,Array[VertexId]))Seq[Edge[Int]]

Once we have that, we can create an EdgeRDD collection for every pair of ingredients that share the same compounds from the food network, as follows:

scala> val flavorPairsRDD: RDD[Edge[Int]] = similarIngr flatMap pairIngredients
flavorPairsRDD: RDD[Edge[Int]]

Finally, we can create the new flavor network:

scala> val flavorNetwork = Graph(ingredients, flavorPairsRDD).cache
flavorNetwork: Graph[Node,Int]

Let's print the first 20 triplets in flavorNetwork:

scala> flavorNetwork.triplets.take(20).foreach(println)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(9,Ingredient(peanut_butter,plant derivative)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)
((3,Ingredient(mackerel,fish/seafood)),(17,Ingredient(red_bean,vegetable)),1)

It seems mackerel, peanut butter and red beans have something in common. Before we try a new recipe, let's slightly modify the network. Notice that duplicate edges are possible when a pair of ingredients share more than one compound. Suppose we want to group parallel edges between each pair of ingredients into a single edge, which contains the number of shared compounds between the two ingredients. We can do that using the groupEdges method:

val flavorWeightedNetwork = flavorNetwork.partitionBy(PartitionStrategy.EdgePartition2D). groupEdges((x,y) => x+y)
flavorWeightedNetwork: Graph[Node,Int]

Tip

groupEdges requires the graph to be repartitioned because it assumes that identical edges will be co-located on the same partition. Thus, you must call partitionBy prior to grouping the edges.

Now, let's print the 20 pairs of ingredients that share the most compounds:

scala> flavorWeightedNetwork.triplets.
sortBy(t => t.attr, false).take(20).
foreach(t => println(t.srcAttr.name + " and " + t.dstAttr.name + " share " + t.attr + " compounds."))
bantu_beer and beer share 227 compounds.
beer and bantu_beer share 227 compounds.
roasted_beef and grilled_beef share 207 compounds.
grilled_beef and roasted_beef share 207 compounds.
grilled_beef and fried_beef share 200 compounds.
fried_beef and grilled_beef share 200 compounds.
beef and roasted_beef share 199 compounds.
beef and grilled_beef share 199 compounds.
beef and raw_beef share 199 compounds.
beef and fried_beef share 199 compounds.
roasted_beef and beef share 199 compounds.
roasted_beef and raw_beef share 199 compounds.
roasted_beef and fried_beef share 199 compounds.
grilled_beef and beef share 199 compounds.
grilled_beef and raw_beef share 199 compounds.
raw_beef and beef share 199 compounds.
raw_beef and roasted_beef share 199 compounds.
raw_beef and grilled_beef share 199 compounds.
raw_beef and fried_beef share 199 compounds.
fried_beef and beef share 199 compounds.

It is not too surprising that roasted beef and grilled beef have lots of things in common. While the example did not teach us much about culinary arts, it showed that we could mix multiple operators to change a graph into a desired form.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset