Using GraphX to analyze Twitter data

GraphX is Spark's approach to graphs and computation against graphs. In this recipe, we will see a preview of what is possible with the GraphX component in Spark.

How to do it...

Now that we have the Twitter data stored in the ElasticSearch index, we will perform the following tasks on this data using a graph:

  1. Convert the ElasticSearch data into a Spark Graph.
  2. Sample vertices, edges, and triplets in the graph.
  3. Find the top group of connected hashtags (connected component).
  4. List all the hashtags in that component.
  1. Converting the ElasticSearch data into a graph: This involves two steps:
    1. Converting ElasticSearch data into a DataFrame: This step, like we saw in an earlier recipe, is just a one-liner:
      def convertElasticSearchDataToDataFrame(sqlContext: SQLContext) = {
          val twStatusDf = sqlContext.esDF("spark/twstatus")
          twStatusDf
      }
    2. Converting DataFrame to a graph: Spark Graph construction requires an RDD for a vertex and an RDD of edges. Let's construct them one by one.

      Vertex RDD requires an RDD of a tuple representing a vertexId and a vertex property. In our case, we'll just do a primitive hash code on the hashTag as the vertex ID and hashTag itself as the property:

       val verticesRdd:RDD[(Long,String)] = df.flatMap { tweet =>
            val hashTags = tweet.getAs[Buffer[String]]("hashTags")
            hashTags.map { tag =>
              val lowercaseTag = tag.toLowerCase()
              val tagHashCode=lowercaseTag.hashCode().toLong
              (tagHashCode, lowercaseTag)
            }
       }

      For the edges, we construct an RDD[Edge] , which wraps a pair of vertex IDs and a property. In our case, we use the first URL (if present) as a property to the edge (we aren't using it for this recipe so an empty string should also be fine). Since there is a possibility of multiple hashtags for a tweet, we use the combinations function to choose pairs and then connect them together as an edge:

      val edgesRdd:RDD[Edge[String]] =df.flatMap { row =>
            val hashTags = row.getAs[Buffer[String]]("hashTags")
      
            val urls = row.getAs[Buffer[String]]("urls")
            val topUrl=if (urls.length>0) urls(0) else ""
      
            val combinations=hashTags.combinations(2)
      
            combinations.map{ combs=>
              val firstHash=combs(0).toLowerCase().hashCode.toLong
              val secondHash=combs(1).toLowerCase().hashCode.toLong
              Edge(firstHash, secondHash, topUrl)
            }
      }

    Finally, we construct the graph using both RDDs:

    val graph=Graph(verticesRdd, edgesRdd)
  2. Sampling vertices, edges, and triplets in the graph: Now that we have our graph constructed, let's sample and see what the vertices, edges, and triplets of the Graph look like. A triple is a representation of an edge and two vertices connected by that edge:
    graph.vertices.take(20).foreach(println)
    

    The output is:

    How to do it...
    graph.edges.take(20).foreach(println)
    

    The output is:

    How to do it...
    graph.triplets.take(20).foreach(println)
    

    The output is:

    How to do it...
  3. Finding the top group of connected hashtags (connected component): As you know, a graph is made of vertices and edges. A connected component of a graph is just a part of the graph (a subgraph) whose vertices are connected to each other by some edge. If there is a vertex that is not connected to another vertex directly or indirectly through another vertex, then they are not connected and therefore don't belong to the same connected component.

    GraphX's graph.connectedComponents provides a graph of all the vertices along with their component IDs:

    val connectedComponents=graph.connectedComponents.cache()

    Let's take the component ID with the maximum number of vertices and then extract the vertices (and eventually the hashtags) that belong to that component:

    val ccCounts:Map[VertexId, Long]=connectedComponents.vertices.map{case (_, vertexId) => vertexId}.countByValue
    
        //Get the top component Id and count
        val topComponent:(VertexId, Long)=ccCounts.toSeq.sortBy{case (componentId, count) => count}.reverse.head

    Since topComponent just has the component ID, in order to fetch the hashTags of the top component, we need to have a representation that maps hashTag to a component ID. This is achieved by joining the graph's vertices to the connectedComponent vertices:

    //RDD of HashTag-Component Id pair. Joins using vertexId
        val hashtagComponentRdd:VertexRDD[(String,VertexId)]=graph.vertices.innerJoin(connectedComponents.vertices){ case (vertexId, hashTag, componentId)=>
          (hashTag, componentId)
        }

    Now that we have componentId and hashTag, let's filter only the hashTags for the top component ID:

    val topComponentHashTags=hashtagComponentRdd
                .filter{ case (vertexId, (hashTag, componentId)) => (componentId==topComponent._1)}
                .map{case (vertexId, (hashTag,componentId)) => hashTag
        }
    
        topComponentHashTags

    The entire method looks like this:

    def getHashTagsOfTopConnectedComponent(graph:Graph[String,String]):RDD[String]={
        //Get all the connected components
        val connectedComponents=graph.connectedComponents.cache()
    
        import scala.collection._
    
        val ccCounts:Map[VertexId, Long]=connectedComponents.vertices.map{case (_, vertexId) => vertexId}.countByValue
    
        //Get the top component Id and count
        val topComponent:(VertexId, Long)=ccCounts.toSeq.sortBy{case (componentId, count) => count}.reverse.head
    
        //RDD of HashTag-Component Id pair. Joins using vertexId
        val hashtagComponentRdd:VertexRDD[(String,VertexId)]=graph.vertices.innerJoin(connectedComponents.vertices){ case (vertexId, hashTag, componentId)=>
          (hashTag, componentId)
        }
    
        //Filter the vertices that belong to the top component alone
        val topComponentHashTags=hashtagComponentRdd
                .filter{ case (vertexId, (hashTag, componentId)) => (componentId==topComponent._1)}
                .map{case (vertexId, (hashTag,componentId)) => hashTag
        }
    
        topComponentHashTags
    
      }
  4. List all the hashtags in that component: Saving the hashTags to a file is as simple as calling saveAsTextFile. The repartition(1) is done just so that we have a single output file. Alternatively, you could use collect() to bring all the data to the driver and inspect it:
    def saveTopTags(topTags:RDD[String]){
        topTags.repartition(1).saveAsTextFile("topTags.txt")
    }

The number of hashtags in the top connected component for our run was 7,320. This shows that in our sample stream there are about 7,320 tags related to fashion that are interrelated. They could be synonyms, closely related, or remotely related to fashion. A snapshot of the file looks like this:

How to do it...

In this chapter, we briefly touched upon Spark streaming, Streaming ML, and GraphX. Please note that this is by no means an exhaustive recipe list for both topics and aims to just provide a taste of what Streaming and GraphX in Spark could do.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset