ML pipeline on graph data and semi-supervised graph-based learning

Due to the big data deluge, there has been a large amount of unlabeled data, and very small amounts of labeled data. As already discussed, labeling and annotating this data is computationally expensive and an obstacle in finding real insight from the data. Also, the increasing growth of the social network and media producers graph data at scale. These data thrive to develop real-time and large-scale supervised learning methods that can use information in the input distribution.

The idea behind the graph-based semi-supervised learning is to construct a graph connecting similar data points or components. That lets the hidden and unobserved labels be random variables on the nodes of this graph. In this type of learning, similar data points can have similar labels and the information propagates from labeled data points to other data points. A similar restriction also limits the ability to express many important steps in typical processing and analytical pipelines. However, the graph-based learning is not optimized for an iterative diffusion technique such as PageRank since lots of computational aspects are underlying and involved.

Moreover, due to API restriction and unavailability, we will not discuss this graph-based machine learning in detail with suitable examples.

However, in this section, we will provide a graph-based semi-supervised application development, which is basically a continuation of the topic modeling that we presented in the previous section.

Introduction to GraphX

GraphX is a comparatively new component in Spark for graphs processing, graph analytics, graph visualization, and graph-parallel computation. Actually, the original computational aspect of the Spark RDD was extended by introducing a new graph abstraction layer as a resilient distributed graph computation that provides resilient properties in the graph processing and storage.

To provide the graph related computation, a set of basic operators such as subgraph, jointVertices, and aggregateMessages are exposed by the GraphX. In addition to this, it also inherited the optimized variant of the Pregel API in the GraphX implementation.

Moreover, to simplify graph analytics tasks, GraphX is being enriched and growing with a collection of graph algorithms and builders.

In the next section, we will introduce how to parse and handle large-scale graph data using the GraphX API of Spark to find the connected components from the topics data that we got from Figure 11.

Getting and parsing graph data using the GraphX API

In this subsection, we will show you how to parse graph data using the GraphX API and then the connected components from the graph will be described in the next subsection. Due to API limitation in GraphX, we were unable to provide the same implementation in Java, but we did so for Scala implementation.

To run the following source code, go to your Spark distribution and start the Spark shell by providing the following commands:

$ cd home/spark-2.0.0-bin-hadoop2.7/bin
$./spark-shell

Then the Spark shell will be available with the Spark session. We assume your Spark distribution is saved in the home/spark-2.0.0-bin-hadoop2.7 path. Please change the path accordingly in order to run the Spark shell. Also, please save the topic terms shown in Figure 11 to separate text files so that you will be able to use those terms for analyzing as graph data before you proceed to follow the next steps:

Step 1: Loading required packages and APIs

Here is the code to load the required packages:

package com.examples.graphs 
import org.apache.spark._ 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD  

Step 2: Prepare the Spark environment

Here is the code to prepare the Spark environment:

val conf = new SparkConf().setAppName("GraphXDemo").setMaster("local[*]") 
val sc = new SparkContext(conf) 

Step 3: Parse the topic's terms and tokenize them

The following code illustrates to parse he topic's terms:

val corpus: RDD[String] = sc.wholeTextFiles("home/ /topics/*.txt").map(_._2) 
val tokenized: RDD[Seq[String]] = corpus.map(_.toLowerCase.split("\s")) 
tokenized.foreach { x => println(x) } 

Step 4: Create RDDs of documents

Create the RDDs of documents as a RDD[(DocumentID, (nodeName, wordCount))] notation. For example, RDD[(1L, (Topic_0, 4))]:

val nodes: RDD[(VertexId, (String, Long))] = tokenized.zipWithIndex().map{  
      case (tokens, id) => 
        val nodeName="Topic_"+id; 
        (id, (nodeName, tokens.size)) 
         
    } 
    nodes.collect().foreach{ 
      x =>  
        println(x._1+": ("+x._2._1+","+x._2._2+")")        
    } 

The preceding print method generates the output as shown in Figure 12:

Getting and parsing graph data using the GraphX API

Figure 12: The nodes.

Step 5: Make a word document pair

Here is the code to make a word document pair:

val wordPairs: RDD[(String, Long)] = 
      tokenized.zipWithIndex.flatMap { 
        case (tokens, id) => 
          val list = new Array[(String, Long)](tokens.size) 
 
          for (i <- 0 to tokens.length - 1) { 
            //tokens.foreach { term => 
            list(i) = (tokens(i), id) 
          } 
          list.toSeq 
      } 
    wordPairs.collect().foreach(x => println(x)) 
    println(wordPairs.count()) 

Step 6: Create the graph relationships between nodes

The following code shows to create a graph relationships between nodes:

val relationships: RDD[Edge[String]] = wordPairs.groupByKey().flatMap{ 
      case(edge, nodes)=> 
        val nodesList = nodes.toArray 
        val list = new Array[Edge[String]](nodesList.length * nodesList.length) 
        if (nodesList.length>1){                   
          var count:Int=0; 
          for (i <- 0 to nodesList.length-2) { 
            for(j<-i+1 to nodesList.length-1){ 
         list(count) = new Edge(nodesList(i), nodesList(j), edge)  
         //list(count+1) = new Edge(nodesList(j), nodesList(i), edge) 
              count += 1; 
              //count += 2; 
            } 
          } 
        } 
        list.toSeq 
    }.filter { x => x!=null } 
    relationships.collect().foreach { x => println(x) } 

Note; if you want to make the graph connected, but not undirected, then just enable the following line:

list(count+1) = new Edge(nodesList(j), nodesList(i), edge) 

Immediately after the following line:

list(count) = new Edge(nodesList(i), nodesList(j), edge)  

Increment the count by 2, that is, count += 2 to make the changes consistent.

Step 7: Initialize the graph

The code illustrated here shows how to illustrate the graph:

val graph = Graph(nodes, relationships) 
println(graph.edges.count) 

Finding the connected components

According to the API documentation at http://spark.apache.org/docs/latest/graphx-programming-guide.html, each connected components of the graph are labeled by the connected components algorithm using the lowest-numbered vertex IDs. For example, in a social network analysis, the clusters are approximated by the connected components. To make this even easier and faster, the GraphX API contains an implementation of the algorithm as the ConnectedComponents object.

However, there's no Java, Python, or R-based implementation for finding the connected components. Therefore, it allows us to compute the connected components in the topics we have calculated using the LDA algorithm through one or more term, as follows:

val facts: RDD[String] = 
      graph.triplets.map(triplet => 
        triplet.srcAttr._1 + " contains the terms "" + triplet.attr + "" like as " + triplet.dstAttr._1) 
    facts.collect.foreach(println(_)) 

This should produce the output as shown in Figure 13:

Finding the connected components

Figure 13: Connection between the topics using GraphX.

If you look the output in Figure 13 carefully, we have printed the relationships with a triplet. It is to be noted that in addition to the support of the vertices and edges, Spark GraphX also has the notion of a triplet. More technically, a triplet is an object that broadens the Edge object. From the graph perspective, it stores the information about an edge and the related vertices adjacent to it in a graph.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset