Getting started with GraphX

You don't need any additional installation of software to get started with GraphX. GraphX is included within the Spark installation. This section introduces how to create and explore graphs using a simple family relationship graph. The family graph created will be used in all operations within this section.

Basic operations of GraphX

GraphX does not support the Python API yet. For easy understanding, let's use spark-shell to interactively work with GraphX. First of all, let's create input data (vertex and edge files) needed for our GraphX operations and then store it on HDFS.

Note

All programs in this chapter are executed on CDH 5.8 VM. For other environments, file paths might change, but the concepts are the same in any environment.

Creating a graph

We can create a graph using the following steps:

  1. Create a vertex file with vertex ID, name, and age as shown here:
    [cloudera@quickstart ~]$ cat vertex.csv
    1,Jacob,48
    2,Jessica,45
    3,Andrew,25
    4,Ryan,53
    5,Emily,22
    6,Lily,52
    
  2. Create an edge file with vertex ID, destination vertex ID, and relationship as follows:
    [cloudera@quickstart ~]$ cat edges.csv 
    6,1,Sister
    1,2,Husband
    2,1,Wife
    5,1,Daughter
    5,2,Daughter
    3,1,Son
    3,2,Son
    4,1,Friend
    1,5,Father
    1,3,Father
    2,5,Mother
    2,3,Mother
    
  3. Now, let's copy both files to HDFS:
    [cloudera@quickstart ~]$ hadoop fs -put vertex.csv
    [cloudera@quickstart ~]$ hadoop fs -put edges.csv
    
  4. Start a Scala shell with the master as yarn-client and then import GraphX and RDD dependencies. Note that GraphX is not supported in the Python language:
    [cloudera@quickstart ~]$ spark-shell --master yarn
    
    scala> sc.master
    res0: String = yarn
    
    scala> import org.apache.spark.graphx._
    scala> import org.apache.spark.rdd.RDD
    
  5. Create an RDD for both vertex and edge files:
    scala> val vertexRDD = sc.textFile("vertex.csv")
    
    vertexRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:25
    
    scala> vertexRDD.collect()
    
    res1: Array[String] = Array(1,Jacob,48, 2,Jessica,45, 3,Andrew,25, 4,Ryan,53, 5,Emily,22, 6,Lily,52)
    
    scala> val edgeRDD =  sc.textFile("edges.csv")
    edgeRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:25
    
    scala> edgeRDD.collect()
    res2: Array[String] = Array(6,1,Sister, 1,2,Husband, 2,1,Wife, 5,1,Daughter, 5,2,Daughter, 3,1,Son, 3,2,Son, 4,1,Friend, 1,5,Father, 1,3,Father, 2,5,Mother, 2,3,Mother)
    
  6. Let's create the VertexRDD with VertexId and strings to represent the person's name and age:
    scala> val vertices: RDD[(VertexId, (String, String))] =
              vertexRDD.map { line =>
              val fields = line.split(",")
           ( fields(0).toLong, ( fields(1), fields(2) ) )
           }
    vertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String))] = MapPartitionsRDD[4] at map at <console>:28
    
    scala> vertices.collect()
    res3: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((1,(Jacob,48)), (2,(Jessica,45)), (3,(Andrew,25)), (4,(Ryan,53)), (5,(Emily,22)), (6,(Lily,52)))
    
  7. Let's create the EdgeRDD with source and destination vertex IDs converted to Long values and the relationship as String. Each record in this RDD is now an Edge record:
    scala> val edges: RDD[Edge[String]] =
              edgeRDD.map { line =>
              val fields = line.split(",")
              Edge(fields(0).toLong, fields(1).toLong, fields(2))
           }
    edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[5] at map at <console>:28
    
    scala> edges.collect()
    res4: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(6,1,Sister), Edge(1,2,Husband), Edge(2,1,Wife), Edge(5,1,Daughter), Edge(5,2,Daughter), Edge(3,1,Son), Edge(3,2,Son), Edge(4,1,Friend), Edge(1,5,Father), Edge(1,3,Father), Edge(2,5,Mother), Edge(2,3,Mother))
    
  8. We should define a default value in case a connection or a vertex is missing. The graph is then constructed from these RDDs—vertices, edges, and the default record:
    scala> val default = ("Unknown", "Missing")
    default: (String, String) = (Unknown,Missing)
    
    scala> val graph = Graph(vertices, edges, default)
    graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@284ee98f
    

This creates a GraphX-based structure called graph, which can now be used for multiple operations. Remember that, although these data samples are small, you can create extremely large graphs using this approach. Many of these algorithms are iterative applications, for instance, PageRank and triangle count, and as a result the programs will generate many iterative Spark jobs. Just like Spark's RDDs, graphs can be cached and un-cached in memory for better performance in iterative processing.

Counting

It is easy to count the number of vertices and edges in a graph with the following functions:

scala> println( "vertices count : " + graph.vertices.count )
vertices count : 6
scala> println( "edges count : " + graph.edges.count )
edges count : 12

Alternatively, counting can be done as follows:

scala> println(s"The graph has ${graph.numVertices} vertices")
The graph has 6 vertices
scala> println(s"The graph has ${graph.numEdges} edges")
The graph has 12 edges

Filtering

We can filter graphs to create sub-graphs. For example, filter the vertices to filter people whose age is greater than 40 years as shown here:

scala> val cnt1 = graph.vertices.filter { case (id, (name, age)) => age.toLong > 40 }.count
cnt1: Long = 4

scala> println( "Vertices count : " + cnt1 )
Vertices count : 4

Now, filter the edges on the relationship property of Mother or Father and then print the output:

scala> val cnt2 = graph.edges.filter { case Edge(from, to, property) => property == "Father" | property == "Mother" }.count
cnt2: Long = 4

scala> println( "Edges count : " + cnt2 )
Edges count : 4

inDegrees, outDegrees, and degrees

GraphX also defines a special data structure for node degree as inDegrees, outDegrees, and degrees. The degree of a node represents the number of links it has to other nodes. We can find the incoming degree of a node, or an in-degree, which is the number of its incoming links. Also, we can find the outgoing degree, or out-degree, which is the number of nodes that it points to. Let's compute the max degrees by defining a reduce operation to compute the highest degree vertex:

scala> def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId,    Int) = {
         if (a._2 > b._2) a else b
       }
max: (a: (org.apache.spark.graphx.VertexId, Int), b: (org.apache.spark.graphx.VertexId, Int))(org.apache.spark.graphx.VertexId, Int)

scala> val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
maxInDegree: (org.apache.spark.graphx.VertexId, Int) = (1,5)

scala> val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
maxOutDegree: (org.apache.spark.graphx.VertexId, Int) = (1,3)

scala> val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)
maxDegrees: (org.apache.spark.graphx.VertexId, Int) = (1,8)

We can define a minimum function to find people with the minimum out-degrees as follows:

scala> val minDegrees = graph.outDegrees.filter(_._2 <= 1)
minDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[62] at RDD at VertexRDD.scala:57
scala> minDegrees.collect()
Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (6,1))

Triplets

GraphX also exposes a triplet view, which logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]]. Figure 9.2 expresses a triplet join graphically:

Triplets

Figure 9.2: Triplet view of a graph

Let's print the triplets of our graph by using the srcAttr, attr, and dstAttr members:

scala> graph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect.foreach(println)

Jacob is the Husband of Jessica
Jessica is the Wife of Jacob
Andrew is the Son of Jacob
Emily is the Daughter of Jacob
Emily is the Daughter of Jessica
Lily is the Sister of Jacob
Jacob is the Father of Andrew
Jacob is the Father of Emily
Jessica is the Mother of Andrew
Jessica is the Mother of Emily
Andrew is the Son of Jessica
Ryan is the Friend of Jacob

Transforming graphs

Graph operators can be used to either change the properties of graph elements or modify the structure of graphs. All the operators that we use in this section are methods that are invoked on a graph and return a new graph. Let's examine join methods to combine graph data with other datasets and perform data operations on VertexRDD and EdgeRDD.

Transforming attributes

The map is one of the main transformation functions for transforming RDDs in Spark. Similarly, graphs also have three map operators as follows:

class Graph[VD, ED] { 
  def mapVertices[VD2](mapFun: (VertexId, VD) => VD2): Graph[VD2, ED] 
  def mapEdges[ED2](mapFun: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](mapFun: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] } 

Each of these methods is called on a graph with the vertex attribute type, VD and edge attribute type, ED. Each of them also takes a user-defined mapping function, say myMap, that performs one of the following:

  • For mapVertices, myMap takes a pair of (VertexId, VD) as input and returns a transformed vertex attribute of type VD2
  • For mapEdges, myMap takes an Edge object as input and returns a transformed edge attribute of type ED2
  • For mapTriplets, myMap takes an EdgeTriplet object as input and returns a transformed edge attribute of type ED2

Modifying graphs

The GraphX library also provides four useful methods for changing the structure of graphs. Their method signatures are listed as follows:

class Graph[VD, ED] { 
  def reverse: Graph[VD, ED] 
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, 
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED] 
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] 
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] 
}

The operators used in the preceding code are explained as follows:

  • reverse: If you want to reverse the edge directions, the reverse operator can be used. This does not cause any data movement and also does not change vertex or edge properties. This is useful for inverse PageRank as an example.
  • subgraph: The subgraph operator is useful for filtering graphs that take two predicate functions as arguments, which return Boolean values. The first predicate epred takes an EdgeTriplet and returns true when the triplet satisfies the predicate. The second predicate vpred takes a pair of (VertexId, VD) and returns true when the vertex satisfies the predicate condition.
  • mask: Just like subgraph, the mask operator also filters a graph. But, mask does not take predicate functions as arguments. It takes another graph as an argument and then the expression graph1.mask(graph2) constructs a sub-graph of graph1 by returning a graph that contains the vertices and edges that are also found in graph2. This is typically used together with the subgraph operator to filter a graph based on the properties in another related graph.
  • groupEdges: The groupEdges operator merges duplicate edges between each pair of nodes into a single edge. This reduces the size of the graph in many applications.

Joining graphs

GraphX also provides APIs for joining RDD datasets with graphs as well. This is really useful when we want to add extra information or merge vertex attributes to any vertex attributes. These tasks can be accomplished using the joinVertices and outerJoinVertices join operators. Let's explore an example of joining the movies RDD with the graph we previously created:

scala> case class MoviesWatched(Movie: String, Genre: String)

scala>  val movies: RDD[(VertexId, MoviesWatched)] = sc.parallelize(List( (1, MoviesWatched("Toy Story 3", "kids")), (2, MoviesWatched("Titanic", "Love")), (3, MoviesWatched("The Hangover", "Comedy"))))

scala>  val movieOuterJoinedGraph = graph.outerJoinVertices(movies)((_,name, movies) => (name,movies))

scala> movieOuterJoinedGraph.vertices.map(t => t).collect.foreach(println)

(4,((Ryan,53),None))
(6,((Lily,52),None))
(2,((Jessica,45),Some(MoviesWatched(Titanic,Love))))
(1,((Jacob,48),Some(MoviesWatched(Toy Story 3,kids))))
(3,((Andrew,25),Some(MoviesWatched(The Hangover,Comedy))))
(5,((Emily,22),None))

You can see that movie information is added for Jacob, Andrew, and Jessica. We can use the getOrElse method to provide default attribute values for the vertices that are not present in the passed vertex:

scala> val movieOuterJoinedGraph = graph.outerJoinVertices(movies)((_,name, movies) => (name,movies.getOrElse(MoviesWatched("NA","NA"))))

scala> movieOuterJoinedGraph.vertices.map(t=>t).collect.foreach(println)

(4,((Ryan,53),MoviesWatched(NA,NA)))
(6,((Lily,52),MoviesWatched(NA,NA)))
(2,((Jessica,45),MoviesWatched(Titanic,Love)))
(1,((Jacob,48),MoviesWatched(Toy Story 3,kids)))
(3,((Andrew,25),MoviesWatched(The Hangover,Comedy)))
(5,((Emily,22),MoviesWatched(NA,NA)))

VertexRDD and EdgeRDD operations

All of the graph operations in previous sections are invoked on a graph and return a new graph object. In this section, let's learn about operations that transform VertexRDD and EdgeRDD collections. The types of these collections are subtypes of RDD[(VertexID, VD)] and RDD[Edge[ED]] respectively.

Mapping VertexRDD and EdgeRDD

mapValues takes a map function as input and transforms each vertex attribute in the VertexRDD to return a new VertexRDD object while preserving the original vertex indices. mapValues is overloaded so that the map function can take two inputs, VD or (VertexId, VD). The type of the new vertex attributes can be different to VD:

def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] 
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]

Similarly, transform the edge attributes while preserving the structure:

def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]

Filtering VertexRDDs

The filter method filters VertexRDD collections by not changing the vertex indexing. The filter method is not overloaded and the type of the predicate must be (VertexId, VD) => Boolean:

def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]

The diff operation that follows also filters vertices inside a VertexRDD collection. It removes vertices from a set that appears in another set:

def diff(another: VertexRDD[VD]): VertexRDD[VD]

The minus operation returns vertices that are unique to a set based on their VertexId:

def minus(another: RDD[(VertexId, VD)])

Joining VertexRDDs

Two join operations, innerJoin and leftJoin, are optimized for VertexRDD collections with internal indexing to accelerate joins:

def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] 
def leftJoin[U, VD2](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[U]) => VD2): VertexRDD[VD2]

Joining EdgeRDDs

The innerJoin operator can be applied for joining two EdgeRDDs as well:

def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] 

This is similar to the innerJoin method for VertexRDD, except that now its input function is different and innerJoin uses the same partitioning strategy as the original EdgeRDD.

Reversing edge directions

We have seen the reverse operation in the Modifying graphs section, that reverses all the edges of graph. If reversing a subset of edges in a graph is the requirement, the following reverse method defined as EdgeRDD objects is useful:

def reverse: EdgeRDD[ED]

GraphX algorithms

GraphX provides many algorithms out-of-the-box in version 1.6. Let's learn about PageRank, triangle counting, and connected components using the graph we created in the previous section.

PageRank measures the importance of each vertex in a graph. It creates a tolerance value, and calls the graph's pageRank method using it. The vertices are then ranked into a new value ranking. In order to make the ranking more meaningful, the ranking values are joined with the original vertices RDD. The rankByPerson value then contains the rank, vertex ID, and person's name.

Make sure to allocate additional memory (with the --driver-memory parameter) to avoid an out-of-memory exception. You can execute it in local mode by assigning more memory to the driver as well, as shown in the following command. If you are restarting the shell, make sure to reconstruct the graph by executing commands shown in the Creating a graph section:

spark-shell --master local[*] --driver-memory 3G 

scala> val tolerance = 0.0001
scala> val ranking = graph.pageRank(tolerance).vertices

scala> val rankByPerson = vertices.join(ranking).map {
          case (id, ( (person,age) , rank )) => (rank, id, person)
          }

The PageRank result, held in rankByPerson, is then printed record by record, using a case statement to identify the record contents, and a format statement to print the contents:

Scala> rankByPerson.collect().foreach {
             case (rank, id, person) =>
             println ( f"Rank $rank%1.2f for id $id person $person")
       }

The output from the application is then shown here. As expected, Jacob and Jessica have the highest rank, as they have the most relationships:

Rank 0.15 for id 4 person Ryan
Rank 0.15 for id 6 person Lily
Rank 1.62 for id 2 person Jessica
Rank 1.82 for id 1 person Jacob
Rank 1.13 for id 3 person Andrew
Rank 1.13 for id 5 person Emily

Triangle counting

The triangle count algorithm provides a vertex-based count of the number of triangles associated with a vertex. For instance, vertex Jacob (1) is connected to Emily (5), who is connected to Jessica (2). Jessica is connected to Jacob (1) and so, a triangle is formed. This can be useful for route finding, where minimum, triangle-free, spanning tree graphs need to be generated for route planning:

scala> val tCount = graph.triangleCount().vertices
scala> println( tCount.collect().mkString("
") )
(4,0)
(6,0)
(2,2)
(1,2)
(3,1)
(5,1)

The results of the application job show that the vertices called Lily (6) and Ryan (4) have no triangles, whereas Jacob (1) and Jessica (2) have the most, as expected, as they have the most relationships.

Connected components

When a large graph is created from the data, it might contain unconnected subgraphs that are isolated from each other and contain no connecting edges between them. This algorithm provides a measure of this connectivity. Depending upon your processing, it is important to know that all the vertices are connected. Let's use two graph methods, connectedComponents and stronglyConnectedComponents, for this algorithm. The strong method required a maximum iteration count, which has been set to 1000. These counts are acting on the graph vertices:

scala> val iterations = 1000
scala> val connected = graph.connectedComponents().vertices
scala> val connectedS = graph.stronglyConnectedComponents(iterations).vertices

The vertex counts are then joined with the original vertex records, so that the connection counts can be associated with vertex information, such as the person's name:

scala> val connByPerson = vertices.join(connected).map {
case (id, ( (person,age) , conn )) => (conn, id, person)
}
scala> val connByPersonS = vertices.join(connectedS).map {
case (id, ( (person,age) , conn )) => (conn, id, person)
}

The results are then output using a case statement and formatted, printing:

scala> connByPerson.collect().foreach {
         case (conn, id, person) =>
         println ( f"Weak $conn $id $person" )
       }

Weak 1 4 Ryan
Weak 1 6 Lily
Weak 1 2 Jessica
Weak 1 1 Jacob
Weak 1 3 Andrew
Weak 1 5 Emily

As expected for the connectedComponents algorithm, the results show that, for each vertex, there is only one component.

The stronglyConnectedComponents algorithm gives a measure of the connectivity in a graph, taking into account the direction of the relationships between them:

scala> connByPersonS.collect().foreach {
         case (conn, id, person) =>
         println ( f"Strong $conn $id $person" )
       }

The result for the stronglyConnectedComponents algorithm output is as follows:

Strong 4 4 Ryan
Strong 6 6 Lily
Strong 1 2 Jessica
Strong 1 1 Jacob
Strong 1 3 Andrew
Strong 1 5 Emily

So, the strong method output shows that, for most vertices, there is only one graph component signified by the 1 in the second column. However, vertices 4 and 6 are not reachable due to the direction of their relationship, and so they have a vertex ID instead of a component ID.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset