The analysis of network connectedness

Next, we are going to visually explore and analyze the connectedness of the food network. Reload the ingredient and compound datasets using the steps explained in the previous chapter. After you are done, create a GraphStream graph object:

// Create a SingleGraph class for GraphStream visualization 
val graph: SingleGraph = new SingleGraph("FoodNetwork")
Then, set the ui.stylesheet attribute of the graph. Since the food network is a bipartite graph, it would be nice to display the nodes with two different colors. We do that using a new style sheet. While we are at it, let's also reduce the node size and hide the text attributes:
node {
    size: 5px;
    text-mode: hidden;
    z-index: 1;
    fill-mode: dyn-plain;
    fill-color: "#e7298a", "#43a2ca";
}
edge {
    shape: line;
    fill-color: #fee6ce;
    arrow-size: 2px, 1px;
    z-index: 0;
}

Tip

The color value in the style sheet is set in hexadecimal using #. You can choose your favorite colors from the awesome ColorBrewer palettes available at http://colorbrewer2.org/.

Let's now load the nodes and edges from foodNetwork to the GraphStream graph again, using the addNode and addEdge methods. This time, we are going to dynamically set the color of the nodes, depending on whether it is an ingredient or a compound:

// Load the graphX vertices into GraphStream nodes
for ((id:VertexId, fnn:FNNode) <- foodNetwork.vertices.collect()) 
{
val node = graph.addNode(id.toString).asInstanceOf[SingleNode]
node.addAttribute("name", fnn.name)
node.addAttribute("ui.label", fnn.name)
if (fnn.isInstanceOf[Compound])
  node.addAttribute("ui.color",1: java.lang.Double)
else if(fnn.isInstanceOf[Compound])
  node.addAttribute("ui.color",0: java.lang.Double)
}

Note

You may ask yourself why we used isInstanceOf[T] to determine the type of the nodes when loading the nodes with addNode. Why did we not use Scala's awesome pattern matching feature? We could have used it in a standalone Spark program, but it is not currently possible to pattern match on case classes in the Spark shell. So, that is why we used isInstanceOf[T].

Loading the nodes of the food network is almost the same as for the social ego network. The only difference is setting different colors for the nodes. In a similar fashion, load the edges into the GraphStream graph object:

// Load the graphX edges into GraphStream edges 
for (Edge(x,y,_) <- foodNetwork.edges.collect()) { 
  val edge = graph.addEdge(x.toString ++ y.toString,
     x.toString, y.toString, 
     true).
asInstanceOf[AbstractEdge] 
}

To visualize the food network, call graph.display(). You will get something like this:

The analysis of network connectedness

From this picture, we can see that many ingredients share the same compounds, whereas some compounds can only be found in some ingredients. Similar to the social ego network, this network consists of some isolated nodes, and a giant component of connected nodes. This leads to our next topic, which is the measure of the connectedness of graphs.

Finding the connected components

In a network, two nodes are connected if there is a path between them on the graph. A network is called connected if all the node pairs are connected. Otherwise, a disconnected network has many components, each of which is connected. To find the connected components of a graph is easy in GraphX using the connectedComponents method.

Using the food network as an example, we can verify that it has exactly 27 components:

// Connected Components  
scala> val cc = foodNetwork.connectedComponents() 
cc: org.apache.spark.graphx.Graph[VertexId,Int] 
 
// Number of components  
scala> cc.vertices.map(_._2).collect.distinct.size 
res: Int = 27

Given the type of cc above, we see it returns another graph with the same number of vertices. The vertices belonging to the same component have the same attribute whose value is the smallest vertex ID in that component. In other words, the attribute of each node identifies its component. Let's see these component identifiers:

scala> cc.vertices.map(_._2).distinct.collect
res6: Array[org.apache.spark.graphx.VertexId] = Array(892, 0, 1344, 528, 468, 392, 960, 409, 557, 529, 585, 1105, 233, 181, 481, 1146, 970, 622, 1186, 514, 1150, 511, 47, 711, 1211, 1511, 363)

Now, suppose we want to list the components and its number of nodes in the descending order. To do this, we can employ Spark's PairedRDD operations which are groupBy and sortBy:

scala> cc.vertices.groupBy(_._2).
   map((p => (p._1,p._2.size))).
   sortBy(x => x._2, false).collect()
res: Array[(VertexId, Int)] = Array((0,2580), (528,8), (1344,3), (392,3), (585,3), (481,3), (892,2), (960,2), (409,2), (557,2), (529,2), (1105,2), (181,2), (970,2), (622,2), (1186,2), (1150,2), (511,2), (47,2), (711,2), (1511,2), (363,2), (468,1), (233,1), (1146,1), (514,1), (1211,1))

The giant component has 2580 ingredient and compound nodes, among which the node with the smallest vertex ID is 0. In general, we can define a function that takes the graph of connected components, and returns the smallest vertex ID and number of nodes in the largest component, as follows:

def largestComponent(cc: Graph[VertexId, Int]): (VertexId, Int) = 
cc.vertices.map(x => (x._2,x._1)).
      groupBy(_._1).
      map(p => (p._1,p._2.size)).
      max()(Ordering.by(_._2))

In this function, we grouped the vertices of the components graph by the component ID. Then, we mapped each component to a key-value pair where the key is the component ID, and the value is the number of nodes of the component. Finally, we use the reduction operator called max to return the key-value pair, corresponding to the largest component. In the preceding example, we had to pass to the max method two lists of arguments. The first one is always empty, whereas the second one is an implicit and takes an ordering. To sort pairs on the second element, we had to pass the right ordering to max as Ordering.by(_._2):

Note

In addition to GraphX's graph-specific operations, Spark's RDD and pair RDD operations can be very useful to certain tasks. This function is a canonical example of a chain of data processing in Spark, which is entirely done with Spark's RDD and pair RDD operations. For more details, see the API documentation for Spark and GraphX at http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.package.

Counting triangles and computing clustering coefficients

In the following, we will use the Enron email dataset to illustrate the analysis of a graph connectedness with counting triangle and the clustering coefficients. A triangle is a connected subgraph of three nodes. Counting how many triangles pass through each node helps to quantify the connectedness of graphs. In particular, counting triangle is required to compute the clustering coefficients, which measure the local density of the neighborhood of each node in the network.

Currently, there is a restriction imposed by the triangle counting implementation in Spark on the input graph. Specifically, the edges of the input graph should be in a canonical direction; that is, the sourceId parameter must be less than the destId parameter. For the email graph, this implies that there should be at most one directed link between the two people. This restriction is not that severe since we can still assume that each directed link in the email graph implies a bidirectional communication between the two people. We can impose this constraint by filtering out the edges for which the ID of the source is larger than that of the destination node. In addition to this restriction, the input graph must also have been partitioned with partitionBy. Thus, we can load the email graph as:

val emailGraph = 
GraphLoader.edgeListFile(sc,"./data/emailEnron.txt").
subgraph(epred = t => t.srcId < t.dstId).
partitionBy(PartitionStrategy.RandomVertexCut)

Once the Enron email graph is loaded, we can compute the triangle counts:

scala> emailGraph.triangleCount()
res: Graph[Int,Int] 
scala> val triangleCounts = emailGraph.triangleCount().vertices
triangleCounts:VertexRDD[Int] 

Similar to connectedComponent, the triangleCount algorithm also returns a new graph with the same number of nodes. However, the triangle count becomes the vertex attribute.

How easy was that? Now, let's calculate the local clustering coefficients of the email network. First, we define a function that calculates the local clustering coefficient of a specific node. The clustering coefficient, at a given node, captures the network's local density at that node. The more densely interconnected its neighborhood is, the closer to 1 is its local clustering coefficient. It can be calculated by the following function:

def clusterCoeff(tup: (VertexId, (Int,Int))): (VertexId, Double) = 
tup match {case (vid, (t, d)) =>
(vid, (2*t.toDouble/(d*(d-1))))
}

The argument of clusterCoeff is a tuple whose elements consist of the vertex ID of the node at which we compute the local cluster coefficient, and of another tuple, containing the triangle count and degree of the node. Then, it returns the cluster coefficient with the vertex ID as a tuple. Actually, the local cluster coefficient of a given node is an estimate of the probability that each pair of its neighbors is connected. Therefore, the coefficient can be calculated as the ratio between the total links between the node's neighbors, which is also equal to the number of triangles that pass through the node, and the number of all possible pairs of neighboring nodes.

With this, we can compute the cluster coefficients for all the nodes:

def clusterCoefficients(graph: Graph[Int,Int]): 
RDD[(VertexId, Double)] = {
val gRDD: RDD[(VertexId, (Int, Int))] = 
graph.triangleCount().vertices join graph.degrees 
gRDD map clusterCoeff
}

This last function takes a graph as an input, and returns a pair of RDD collections, whose elements contain the vertex identifiers and the corresponding local coefficients.

Note

The formula for the local clustering coefficient at a given node is well-defined only when its degree, that is the number of its neighbors, is larger than one. If the node has a degree of one or zero, the clusterCoeff function will return a NaN value for the clustering coefficient. Therefore, we must first check if some nodes are isolated in the network when we want to compute an average or global clustering coefficient for a network. Not only must we filter out the leaves and isolated nodes but also, we must adjust the formula of the global clustering coefficient to avoid a biased assessment of the neighborhood clustering.

Let's now use the previous functions to compute the cluster coefficients for the email graph:

scala> val coeffs = clusterCoefficients(emailGraph)
scala> coeffs.take(10) 
res: Array[(VertexId, Double)] = Array((19021,0.9), (28730,1.0), (23776,1.0), (31037,0.0), (34207,NaN), (29127,1.0), (9831,NaN), (5354,0.0380952380952381), (32676,0.46153846153846156), (4926,1.0))

We see that for some nodes, the returned clustering coefficient has a NaN value. In fact, this is the case for 25481 out of the 36692 nodes:

// Check the NaN values. 
scala> coeffs.filter (x => !x._2.isNaN).count 
res: Long = 25481

To remedy this fact, we need to filter out these nodes when averaging the cluster coefficients:

// Calculate the adjusted global cluster coefficient 
scala> val nonIsolatedNodes = coeffs.filter(x => !x._2.isNaN) nonIsolatedNodes: RDD[(VertexId, Double)] 
scala> val globalCoeff = 
   nonIsolatedNodes.map(_._2).sum / nonIsolatedNodes.count globalCoeff: Double = 0.7156424032347621
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset