You don't need any additional installation of software to get started with GraphX. GraphX is included within the Spark installation. This section introduces how to create and explore graphs using a simple family relationship graph. The family graph created will be used in all operations within this section.
GraphX does not support the Python API yet. For easy understanding, let's use spark-shell
to interactively work with GraphX. First of all, let's create input data (vertex and edge files) needed for our GraphX operations and then store it on HDFS.
We can create a graph using the following steps:
[cloudera@quickstart ~]$ cat vertex.csv 1,Jacob,48 2,Jessica,45 3,Andrew,25 4,Ryan,53 5,Emily,22 6,Lily,52
[cloudera@quickstart ~]$ cat edges.csv 6,1,Sister 1,2,Husband 2,1,Wife 5,1,Daughter 5,2,Daughter 3,1,Son 3,2,Son 4,1,Friend 1,5,Father 1,3,Father 2,5,Mother 2,3,Mother
[cloudera@quickstart ~]$ hadoop fs -put vertex.csv [cloudera@quickstart ~]$ hadoop fs -put edges.csv
[cloudera@quickstart ~]$ spark-shell --master yarn scala> sc.master res0: String = yarn scala> import org.apache.spark.graphx._ scala> import org.apache.spark.rdd.RDD
scala> val vertexRDD = sc.textFile("vertex.csv") vertexRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:25 scala> vertexRDD.collect() res1: Array[String] = Array(1,Jacob,48, 2,Jessica,45, 3,Andrew,25, 4,Ryan,53, 5,Emily,22, 6,Lily,52) scala> val edgeRDD = sc.textFile("edges.csv") edgeRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:25 scala> edgeRDD.collect() res2: Array[String] = Array(6,1,Sister, 1,2,Husband, 2,1,Wife, 5,1,Daughter, 5,2,Daughter, 3,1,Son, 3,2,Son, 4,1,Friend, 1,5,Father, 1,3,Father, 2,5,Mother, 2,3,Mother)
VertexId
and strings to represent the person's name and age:scala> val vertices: RDD[(VertexId, (String, String))] = vertexRDD.map { line => val fields = line.split(",") ( fields(0).toLong, ( fields(1), fields(2) ) ) } vertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String))] = MapPartitionsRDD[4] at map at <console>:28 scala> vertices.collect() res3: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((1,(Jacob,48)), (2,(Jessica,45)), (3,(Andrew,25)), (4,(Ryan,53)), (5,(Emily,22)), (6,(Lily,52)))
Long
values and the relationship as String. Each record in this RDD is now an Edge record:scala> val edges: RDD[Edge[String]] = edgeRDD.map { line => val fields = line.split(",") Edge(fields(0).toLong, fields(1).toLong, fields(2)) } edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[5] at map at <console>:28 scala> edges.collect() res4: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(6,1,Sister), Edge(1,2,Husband), Edge(2,1,Wife), Edge(5,1,Daughter), Edge(5,2,Daughter), Edge(3,1,Son), Edge(3,2,Son), Edge(4,1,Friend), Edge(1,5,Father), Edge(1,3,Father), Edge(2,5,Mother), Edge(2,3,Mother))
scala> val default = ("Unknown", "Missing") default: (String, String) = (Unknown,Missing) scala> val graph = Graph(vertices, edges, default) graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@284ee98f
This creates a GraphX-based structure called graph
, which can now be used for multiple operations. Remember that, although these data samples are small, you can create extremely large graphs using this approach. Many of these algorithms are iterative applications, for instance, PageRank and triangle count, and as a result the programs will generate many iterative Spark jobs. Just like Spark's RDDs, graphs can be cached and un-cached in memory for better performance in iterative processing.
It is easy to count the number of vertices and edges in a graph with the following functions:
scala> println( "vertices count : " + graph.vertices.count ) vertices count : 6 scala> println( "edges count : " + graph.edges.count ) edges count : 12
Alternatively, counting can be done as follows:
scala> println(s"The graph has ${graph.numVertices} vertices") The graph has 6 vertices scala> println(s"The graph has ${graph.numEdges} edges") The graph has 12 edges
We can filter graphs to create sub-graphs. For example, filter the vertices to filter people whose age is greater than 40 years as shown here:
scala> val cnt1 = graph.vertices.filter { case (id, (name, age)) => age.toLong > 40 }.count cnt1: Long = 4 scala> println( "Vertices count : " + cnt1 ) Vertices count : 4
Now, filter the edges on the relationship property of Mother
or Father
and then print the output:
scala> val cnt2 = graph.edges.filter { case Edge(from, to, property) => property == "Father" | property == "Mother" }.count cnt2: Long = 4 scala> println( "Edges count : " + cnt2 ) Edges count : 4
GraphX also defines a special data structure for node degree as inDegrees
, outDegrees
, and degrees
. The degree of a node represents the number of links it has to other nodes. We can find the incoming degree of a node, or an in-degree, which is the number of its incoming links. Also, we can find the outgoing degree, or out-degree, which is the number of nodes that it points to. Let's compute the max
degrees by defining a reduce
operation to compute the highest degree vertex:
scala> def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } max: (a: (org.apache.spark.graphx.VertexId, Int), b: (org.apache.spark.graphx.VertexId, Int))(org.apache.spark.graphx.VertexId, Int) scala> val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) maxInDegree: (org.apache.spark.graphx.VertexId, Int) = (1,5) scala> val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) maxOutDegree: (org.apache.spark.graphx.VertexId, Int) = (1,3) scala> val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) maxDegrees: (org.apache.spark.graphx.VertexId, Int) = (1,8)
We can define a minimum function to find people with the minimum out-degrees as follows:
scala> val minDegrees = graph.outDegrees.filter(_._2 <= 1) minDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[62] at RDD at VertexRDD.scala:57 scala> minDegrees.collect() Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (6,1))
GraphX also exposes a triplet view, which logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]]
. Figure 9.2 expresses a triplet join graphically:
Let's print the triplets of our graph by using the srcAttr
, attr
, and dstAttr
members:
scala> graph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect.foreach(println) Jacob is the Husband of Jessica Jessica is the Wife of Jacob Andrew is the Son of Jacob Emily is the Daughter of Jacob Emily is the Daughter of Jessica Lily is the Sister of Jacob Jacob is the Father of Andrew Jacob is the Father of Emily Jessica is the Mother of Andrew Jessica is the Mother of Emily Andrew is the Son of Jessica Ryan is the Friend of Jacob
Graph operators can be used to either change the properties of graph elements or modify the structure of graphs. All the operators that we use in this section are methods that are invoked on a graph and return a new graph. Let's examine join methods to combine graph data with other datasets and perform data operations on VertexRDD and EdgeRDD.
The map
is one of the main transformation functions for transforming RDDs in Spark. Similarly, graphs also have three map operators as follows:
class Graph[VD, ED] { def mapVertices[VD2](mapFun: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](mapFun: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](mapFun: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] }
Each of these methods is called on a graph with the vertex attribute type, VD
and edge attribute type, ED
. Each of them also takes a user-defined mapping function, say myMap
, that performs one of the following:
mapVertices
, myMap
takes a pair of (VertexId, VD)
as input and returns a transformed vertex attribute of type VD2
mapEdges
, myMap
takes an Edge
object as input and returns a transformed edge attribute of type ED2
mapTriplets
, myMap
takes an EdgeTriplet
object as input and returns a transformed edge attribute of type ED2
The GraphX library also provides four useful methods for changing the structure of graphs. Their method signatures are listed as follows:
class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] }
The operators used in the preceding code are explained as follows:
reverse
: If you want to reverse the edge directions, the reverse
operator can be used. This does not cause any data movement and also does not change vertex or edge properties. This is useful for inverse PageRank as an example.subgraph
: The subgraph
operator is useful for filtering graphs that take two predicate functions as arguments, which return Boolean values. The first predicate epred
takes an EdgeTriplet
and returns true
when the triplet satisfies the predicate. The second predicate vpred
takes a pair of (VertexId, VD)
and returns true
when the vertex satisfies the predicate condition.mask
: Just like subgraph
, the mask
operator also filters a graph. But, mask
does not take predicate functions as arguments. It takes another graph as an argument and then the expression graph1.mask(graph2)
constructs a sub-graph of graph1
by returning a graph that contains the vertices and edges that are also found in graph2
. This is typically used together with the subgraph
operator to filter a graph based on the properties in another related graph.groupEdges
: The groupEdges
operator merges duplicate edges between each pair of nodes into a single edge. This reduces the size of the graph in many applications.GraphX also provides APIs for joining RDD datasets with graphs as well. This is really useful when we want to add extra information or merge vertex attributes to any vertex attributes. These tasks can be accomplished using the joinVertices
and outerJoinVertices
join operators. Let's explore an example of joining the movies RDD with the graph we previously created:
scala> case class MoviesWatched(Movie: String, Genre: String) scala> val movies: RDD[(VertexId, MoviesWatched)] = sc.parallelize(List( (1, MoviesWatched("Toy Story 3", "kids")), (2, MoviesWatched("Titanic", "Love")), (3, MoviesWatched("The Hangover", "Comedy")))) scala> val movieOuterJoinedGraph = graph.outerJoinVertices(movies)((_,name, movies) => (name,movies)) scala> movieOuterJoinedGraph.vertices.map(t => t).collect.foreach(println) (4,((Ryan,53),None)) (6,((Lily,52),None)) (2,((Jessica,45),Some(MoviesWatched(Titanic,Love)))) (1,((Jacob,48),Some(MoviesWatched(Toy Story 3,kids)))) (3,((Andrew,25),Some(MoviesWatched(The Hangover,Comedy)))) (5,((Emily,22),None))
You can see that movie information is added for Jacob
, Andrew
, and Jessica
. We can use the getOrElse
method to provide default attribute values for the vertices that are not present in the passed vertex:
scala> val movieOuterJoinedGraph = graph.outerJoinVertices(movies)((_,name, movies) => (name,movies.getOrElse(MoviesWatched("NA","NA")))) scala> movieOuterJoinedGraph.vertices.map(t=>t).collect.foreach(println) (4,((Ryan,53),MoviesWatched(NA,NA))) (6,((Lily,52),MoviesWatched(NA,NA))) (2,((Jessica,45),MoviesWatched(Titanic,Love))) (1,((Jacob,48),MoviesWatched(Toy Story 3,kids))) (3,((Andrew,25),MoviesWatched(The Hangover,Comedy))) (5,((Emily,22),MoviesWatched(NA,NA)))
All of the graph operations in previous sections are invoked on a graph and return a new graph object. In this section, let's learn about operations that transform VertexRDD and EdgeRDD collections. The types of these collections are subtypes of RDD[(VertexID, VD)]
and RDD[Edge[ED]]
respectively.
mapValues
takes a map
function as input and transforms each vertex attribute in the VertexRDD to return a new VertexRDD object while preserving the original vertex indices. mapValues
is overloaded so that the map
function can take two inputs, VD
or (VertexId, VD)
. The type of the new vertex attributes can be different to VD
:
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
Similarly, transform the edge attributes while preserving the structure:
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
The filter
method filters VertexRDD collections by not changing the vertex indexing. The filter
method is not overloaded and the type of the predicate must be (VertexId, VD) => Boolean
:
def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
The diff
operation that follows also filters vertices inside a VertexRDD collection. It removes vertices from a set that appears in another set:
def diff(another: VertexRDD[VD]): VertexRDD[VD]
The minus
operation returns vertices that are unique to a set based on their VertexId
:
def minus(another: RDD[(VertexId, VD)])
Two join operations, innerJoin
and leftJoin
, are optimized for VertexRDD collections with internal indexing to accelerate joins:
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] def leftJoin[U, VD2](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[U]) => VD2): VertexRDD[VD2]
The innerJoin
operator can be applied for joining two EdgeRDDs as well:
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
This is similar to the innerJoin
method for VertexRDD, except that now its input function is different and innerJoin
uses the same partitioning strategy as the original EdgeRDD.
GraphX provides many algorithms out-of-the-box in version 1.6. Let's learn about PageRank, triangle counting, and connected components using the graph we created in the previous section.
PageRank measures the importance of each vertex in a graph. It creates a tolerance value, and calls the graph's pageRank
method using it. The vertices are then ranked into a new value ranking. In order to make the ranking more meaningful, the ranking values are joined with the original vertices RDD. The rankByPerson
value then contains the rank, vertex ID, and person's name.
Make sure to allocate additional memory (with the --driver-memory
parameter) to avoid an out-of-memory exception. You can execute it in local mode by assigning more memory to the driver as well, as shown in the following command. If you are restarting the shell, make sure to reconstruct the graph by executing commands shown in the Creating a graph section:
spark-shell --master local[*] --driver-memory 3G scala> val tolerance = 0.0001 scala> val ranking = graph.pageRank(tolerance).vertices scala> val rankByPerson = vertices.join(ranking).map { case (id, ( (person,age) , rank )) => (rank, id, person) }
The PageRank result, held in rankByPerson
, is then printed record by record, using a case
statement to identify the record contents, and a format statement to print the contents:
Scala> rankByPerson.collect().foreach { case (rank, id, person) => println ( f"Rank $rank%1.2f for id $id person $person") }
The output from the application is then shown here. As expected, Jacob and Jessica have the highest rank, as they have the most relationships:
Rank 0.15 for id 4 person Ryan Rank 0.15 for id 6 person Lily Rank 1.62 for id 2 person Jessica Rank 1.82 for id 1 person Jacob Rank 1.13 for id 3 person Andrew Rank 1.13 for id 5 person Emily
The triangle count algorithm provides a vertex-based count of the number of triangles associated with a vertex. For instance, vertex Jacob (1
) is connected to Emily (5
), who is connected to Jessica (2
). Jessica is connected to Jacob (1
) and so, a triangle is formed. This can be useful for route finding, where minimum, triangle-free, spanning tree graphs need to be generated for route planning:
scala> val tCount = graph.triangleCount().vertices scala> println( tCount.collect().mkString(" ") ) (4,0) (6,0) (2,2) (1,2) (3,1) (5,1)
The results of the application job show that the vertices called Lily (6
) and Ryan (4
) have no triangles, whereas Jacob (1
) and Jessica (2
) have the most, as expected, as they have the most relationships.
When a large graph is created from the data, it might contain unconnected subgraphs that are isolated from each other and contain no connecting edges between them. This algorithm provides a measure of this connectivity. Depending upon your processing, it is important to know that all the vertices are connected. Let's use two graph methods, connectedComponents
and stronglyConnectedComponents
, for this algorithm. The strong method required a maximum iteration count, which has been set to 1000
. These counts are acting on the graph vertices:
scala> val iterations = 1000 scala> val connected = graph.connectedComponents().vertices scala> val connectedS = graph.stronglyConnectedComponents(iterations).vertices
The vertex counts are then joined with the original vertex records, so that the connection counts can be associated with vertex information, such as the person's name:
scala> val connByPerson = vertices.join(connected).map { case (id, ( (person,age) , conn )) => (conn, id, person) } scala> val connByPersonS = vertices.join(connectedS).map { case (id, ( (person,age) , conn )) => (conn, id, person) }
The results are then output using a case
statement and formatted, printing:
scala> connByPerson.collect().foreach { case (conn, id, person) => println ( f"Weak $conn $id $person" ) } Weak 1 4 Ryan Weak 1 6 Lily Weak 1 2 Jessica Weak 1 1 Jacob Weak 1 3 Andrew Weak 1 5 Emily
As expected for the connectedComponents
algorithm, the results show that, for each vertex, there is only one component.
The stronglyConnectedComponents
algorithm gives a measure of the connectivity in a graph, taking into account the direction of the relationships between them:
scala> connByPersonS.collect().foreach { case (conn, id, person) => println ( f"Strong $conn $id $person" ) }
The result for the stronglyConnectedComponents
algorithm output is as follows:
Strong 4 4 Ryan Strong 6 6 Lily Strong 1 2 Jessica Strong 1 1 Jacob Strong 1 3 Andrew Strong 1 5 Emily
So, the strong method output shows that, for most vertices, there is only one graph component signified by the 1
in the second column. However, vertices 4
and 6
are not reachable due to the direction of their relationship, and so they have a vertex ID instead of a component ID.