Building graphs

Let's now open our Spark shell and build three types of graphs: a directed email communication network, a bipartite graph of ingredient-compound connections, and a multigraph using the previous graph builders.

Note

Unless otherwise stated, we always assume that the Spark shell is launched from the $SPARKHOME directory. It then becomes the current directory for any relative file path used in this book.

Building directed graphs

The first graph that we will build is the Enron email communication network. If you have restarted your Spark shell, you need to again import the GraphX library. First, create a new folder called data inside $SPARKHOME and copy the dataset into it. This file contains the adjacency list of the email communications between the employees. Assuming that the current directory is $SPARKHOME, we can pass the file path to the GraphLoader.edgeListFile method:

scala> import org.apache.spark.graphx._
import org.apache.spark.graphx._

scala> import org.apache.spark.rdd._
import org.apache.spark.rdd._

scala> val emailGraph = GraphLoader.edgeListFile(sc, "./data/emailEnron.txt")
emailGraph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@609db0e

Notice that the GraphLoader.edgeListFile method always returns a graph object, whose vertex and edge attributes have a type Int. Their default values are 1. We can check this by looking at the first five vertices and edges in the graph:

scala> emailGraph.vertices.take(5)
res: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((19021,1), (28730,1), (23776,1), (31037,1), (34207,1))

scala> emailGraph.edges.take(5)
res: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(0,1,1), Edge(1,0,1), Edge(1,2,1), Edge(1,3,1), Edge(1,4,1))

The first node (19021,1) has the vertex ID 19021 and its attribute is indeed set to 1. Similarly, the first edge Edge(0,1,1) captures the communication between the source 0 and destination 1.

In GraphX, all the edges must be directed. To express non-directed or bidirectional graphs, we can link each connected pair in both directions. In our email network, we can verify for instance that the 19021 node has both incoming and outgoing links. First, we collect the destination nodes that node 19021 communicates to:

scala> emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect()
res: Array[org.apache.spark.graphx.VertexId] = Array(696, 4232, 6811, 8315, 26007)

It turns out that these same nodes are also the source nodes for the incoming edges to 19021:

scala> emailGraph.edges.filter(_.dstId == 19021).map(_.srcId).collect()
res: Array[org.apache.spark.graphx.VertexId] = Array(696, 4232, 6811, 8315, 26007)

Building a bipartite graph

In some applications, it is useful to represent a view of a system as a bipartite graph. A bipartite graph is composed of two sets of nodes. The nodes within the same set cannot be connected but only the pairs belonging to the different sets can be. An example of such a graph is the food ingredient-compound network.

Here, we will work with the files ingr_info.tsv, comp_info.tsv, and ingr_comp.tsv, which should be copied into the $SPARKHOME/data folder. The first two files contain the information about the food ingredients and compounds respectively.

Let's have a quick look at the first lines of these two files using the Source.fromFile method of scala.io.Source. Our only requirement for this method is to simply inspect the beginning of the text files:

scala> import scala.io.Source
import scala.io.Source

scala> Source.fromFile("./data/ingr_info.tsv").getLines().
      take(7).foreach(println)
# id  ingredient name  category
0  magnolia_tripetala  flower
1  calyptranthes_parriculata  plant
2  chamaecyparis_pisifera_oil  plant derivative
3  mackerel  fish/seafood
4  mimusops_elengi_flower  flower
5  hyssop  herb

scala> Source.fromFile("./data/comp_info.tsv").getLines().
take(7).foreach(println)
# id  Compound name  CAS number
0  jasmone  488-10-8
1  5-methylhexanoic_acid  628-46-6
2  l-glutamine  56-85-9
3  1-methyl-3-methoxy-4-isopropylbenzene  1076-56-8
4  methyl-3-phenylpropionate  103-25-3
5  3-mercapto-2-methylpentan-1-ol_(racemic)  227456-27-1

The third file contains the adjacency list between the ingredients and the compounds:

scala> Source.fromFile("./data/ingr_comp.tsv").getLines().
take(7).foreach(println)
# ingredient id  compound id
1392  906
1259  861
1079  673
22    906
103    906
1005  906 

In practice, the datasets from which we build the graphs will not come in a form that the graph builders in Spark expect them to be in. For example, in the food network example, we have two problems with the datasets. First, we cannot simply create a graph from the adjacency list because the indices of the ingredients and compounds both start at zero and overlap with each other. Therefore, there is no way to distinguish the two nodes if they happen to have the same vertex ID. Second, we have two kinds of nodes--ingredients and compounds:

Tip

In order to create a bipartite graph, we first need to create the case classes named Ingredient and Compound, and use Scala's inheritance so that these two classes are the children of a FNNode class.

scala> class FNNode(val name: String)
defined class FNNode

scala> case class Ingredient(override val name: String, category: String) extends FNNode(name)
defined class Ingredient

scala> case class Compound(override val name: String, cas: String) extends FNNode(name)
defined class Compound

After this, we need to load all the Compound and Ingredient objects into an RDD[FNNode] collection. This part requires some data wrangling:

val ingredients: RDD[(VertexId, FNNode)] = 
sc.textFile("./data/ingr_info.tsv").
      filter(! _.startsWith("#")).
      map {line => 
             val row = line split '	'
             (row(0).toInt, Ingredient(row(1), row(2)))
          }                           
ingredients: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, FNNode)] = MappedRDD[32] at map at <console>:26

In the preceding code, we first loaded the text in comp_info.tsv into an RDD of String, and filtered out the comment lines starting with #. Then, we parsed the tab-delimited lines into RDD of Ingredient vertices. Now, let's do a similar thing with comp_info.tsv and create an RDD of Compound vertices:

val compounds: RDD[(VertexId, FNNode)] = 
sc.textFile("./data/comp_info.tsv").
      filter(! _.startsWith("#")).
      map {line => 
             val row = line split '	'
             (10000L + row(0).toInt, Compound(row(1), row(2)))
          }                              
compounds: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, FNNode)] = MappedRDD[28] at map at <console>:26     

However, there is a critical thing that we did earlier. Since the index of each node should be unique, we had to shift the range of the compound indices by 10000L, so that there is no index that refers to an ingredient and a compound at the same time.

Next, we create an RDD[Edge[Int]] collection from the dataset named ingr_comp.tsv:

val links: RDD[Edge[Int]] = 
  sc.textFile("./data/ingr_comp.tsv").
     filter(! _.startsWith("#")).
     map {line => 
        val row = line split '	'
        Edge(row(0).toInt, 10000L + row(1).toInt, 1)
     }

When parsing the lines of the adjacency list in ingr_comp.tsv, we also shift the indices of compounds by 10000L. This quick fix works perfectly because we knew in advance, from the dataset description, how many ingredients and compounds there were in the dataset. Be more careful with real messy datasets! Next, as the links between ingredients and compounds do not contain any weight or meaningful attributes, we just parameterized the Edge class with the Int type, and set a default value of 1 for the attribute of each link.

Finally, we concatenate the two sets of nodes into a single RDD, and pass it to the Graph() factory method along with the RDD link:

scala> val nodes = ingredients ++ compounds
nodes: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, FNNode)] = UnionRDD[61] at $plus$plus at <console>:30

scala> val foodNetwork = Graph(nodes, links)
foodNetwork: org.apache.spark.graphx.Graph[FNNode,Int] = org.apache.spark.graphx.impl.GraphImpl@635933c1

So, let's explore the ingredient-compound graph:

scala> def showTriplet(t: EdgeTriplet[FNNode,Int]): String = "The ingredient " ++ t.srcAttr.name ++ " contains " ++ t.dstAttr.name
showTriplet: (t: EdgeTriplet[FNNode,Int])String

scala> foodNetwork.triplets.take(5).
     foreach(showTriplet _ andThen println _)
The ingredient calyptranthes_parriculata contains citral_(neral)
The ingredient chamaecyparis_pisifera_oil contains undecanoic_acid
The ingredient hyssop contains myrtenyl_acetate
The ingredient hyssop contains 4-(2,6,6-trimethyl-cyclohexa-1,3-dienyl) but-2-en-4-one
The ingredient buchu contains menthol

First, we defined a helper function called showTriplet that returns a String description of an ingredient-compound triplet. Then, we took the first five triplets and printed them out on the console. In the preceding example, we used Scala's function composition in the showTriplet _ andThen println _ argument and it passed to the foreach method.

Building a weighted social ego network

As a final example, let's build an ego network from the Google+ dataset that we presented earlier in this chapter. An ego network is a graph representation of one person's connections. Precisely, it focuses on a single node called the focal node and only represents the links between that node and its neighbors. Although the entire dataset from the SNAP website contains the ego networks of 133 Google+ users, we are only going to build one person's ego network as an illustration. The files that we are going to work with are placed in $SPARKHOME/data.

Their description is given as follows:

  • ego.edges: These are directed edges in the ego network. The ego node does not appear in this list, but it is assumed that it follows every node ID that appears in the file.
  • ego.feat : This features for each of the nodes that appear in the edge file.
  • ego.featnames: This is the name of each of the feature dimensions. The feature is 1 if the user has this property in their profile, and 0 otherwise.

First, let's import the absolute value function and the SparseVector class from the Breeze library, which we will be using:

import scala.math.abs
import breeze.linalg.SparseVector

Then, let's also define a type synonym called Feature for SparseVector[Int]:

type Feature = breeze.linalg.SparseVector[Int]

Using the following code, we can read the features inside the ego.feat file and collect them in a map whose keys and values are of the Long and Feature types, respectively:

val featureMap: Map[Long, Feature] = 
  Source.fromFile("./data/ego.feat").
     getLines().
     map{line => 
     val row = line split ' '
     val key = abs(row.head.hashCode.toLong)
     val feat = SparseVector(row.tail.map(_.toInt))
     (key, feat)
     }.toMap

Let's step back and take a quick look inside the ego.feat file to understand what the preceding chain of RDD transformations is doing, and why it is needed. Each line in ego.feat has the following form:

Building a weighted social ego network

The first number in each line corresponds to a node's ID in the ego network. The remaining string of 0 and 1 numbers indicate which feature this particular node has. For example, the first 1 after the node's ID corresponds to the gender:1 feature. In fact, each feature is by the design of the description:value form. In practice, we usually have a limited control over the format of the datasets that we are working with. As in this example, there is always some data wrangling that we need to do. First, each vertex in the ego network should have a vertex ID of the Long type. However, the node IDs in the dataset, such as 114985346359714431656, exceed the permitted range for Long.

Therefore, we have to create new indices for the nodes. Second, we need to parse the string of 0 and 1 in the data to create a feature vector that has a more convenient form.

Luckily, these issues do have easy fixes. To convert the original node ID to a vertex ID, we simply hash the string that corresponds to the node ID, as follows:

val key = abs(row.head.hashCode.toLong)

Then, we took advantage of the SparseVector representation in the Breeze library to efficiently store the feature indices.

Next, we can read the ego.edges file to create an RDD[Edge[Int]] collection of the links in the ego network. In contrast to our previous graph examples, we model the ego network as a weighted graph. Precisely, the attribute of each link will correspond to the number of common features that each connected pair has. This is done by the following transformations:

val edges: RDD[Edge[Int]] = 
  sc.textFile("./data/ego.edges").
     map {line => 
        val row = line split ' '
        val srcId = abs(row(0).hashCode.toLong)
        val dstId = abs(row(1).hashCode.toLong)
        val srcFeat = featureMap(srcId)
        val dstFeat = featureMap(dstId)
        val numCommonFeats = srcFeat dot dstFeat
        Edge(srcId, dstId, numCommonFeats)
     }

Tip

To find the number of common features between the source and destination nodes, we just used the dot product operation of the SparseVector class in Breeze. Again, we also had to compute new vertex IDs using the hashCode attribute of the node IDs in the dataset.

Finally, we can now create an ego network using the Graph.fromEdges function. This function takes as arguments the RDD[Edge[Int]] collection and the default value for the vertices:

val egoNetwork: Graph[Int,Int] = Graph.fromEdges(edges, 1)

Then, we can check how many of the nodes in the ego node's connections have some features in common with their adjacent nodes:

scala> egoNetwork.edges.filter(_.attr == 3).count()
res: Long = 1852

scala> egoNetwork.edges.filter(_.attr == 2).count()
res: Long = 9353

scala> egoNetwork.edges.filter(_.attr == 1).count()
res: Long = 107934
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset