GraphX is Spark's approach to graphs and computation against graphs. In this recipe, we will see a preview of what is possible with the GraphX component in Spark.
Now that we have the Twitter data stored in the ElasticSearch index, we will perform the following tasks on this data using a graph:
def convertElasticSearchDataToDataFrame(sqlContext: SQLContext) = { val twStatusDf = sqlContext.esDF("spark/twstatus") twStatusDf }
Vertex RDD requires an RDD of a tuple representing a vertexId
and a vertex
property. In our case, we'll just do a primitive hash code on the hashTag
as the vertex ID and hashTag
itself as the property:
val verticesRdd:RDD[(Long,String)] = df.flatMap { tweet => val hashTags = tweet.getAs[Buffer[String]]("hashTags") hashTags.map { tag => val lowercaseTag = tag.toLowerCase() val tagHashCode=lowercaseTag.hashCode().toLong (tagHashCode, lowercaseTag) } }
For the edges, we construct an RDD[Edge]
, which wraps a pair of vertex IDs and a property. In our case, we use the first URL (if present) as a property to the edge (we aren't using it for this recipe so an empty string should also be fine). Since there is a possibility of multiple hashtags for a tweet, we use the combinations
function to choose pairs and then connect them together as an edge:
val edgesRdd:RDD[Edge[String]] =df.flatMap { row => val hashTags = row.getAs[Buffer[String]]("hashTags") val urls = row.getAs[Buffer[String]]("urls") val topUrl=if (urls.length>0) urls(0) else "" val combinations=hashTags.combinations(2) combinations.map{ combs=> val firstHash=combs(0).toLowerCase().hashCode.toLong val secondHash=combs(1).toLowerCase().hashCode.toLong Edge(firstHash, secondHash, topUrl) } }
Finally, we construct the graph using both RDDs:
val graph=Graph(verticesRdd, edgesRdd)
graph.vertices.take(20).foreach(println)
The output is:
graph.edges.take(20).foreach(println)
The output is:
graph.triplets.take(20).foreach(println)
The output is:
GraphX's graph.connectedComponents
provides a graph of all the vertices along with their component IDs:
val connectedComponents=graph.connectedComponents.cache()
Let's take the component ID with the maximum number of vertices and then extract the vertices (and eventually the hashtags) that belong to that component:
val ccCounts:Map[VertexId, Long]=connectedComponents.vertices.map{case (_, vertexId) => vertexId}.countByValue //Get the top component Id and count val topComponent:(VertexId, Long)=ccCounts.toSeq.sortBy{case (componentId, count) => count}.reverse.head
Since topComponent
just has the component ID, in order to fetch the hashTags
of the top component, we need to have a representation that maps hashTag
to a component ID. This is achieved by joining the graph's vertices to the connectedComponent
vertices:
//RDD of HashTag-Component Id pair. Joins using vertexId val hashtagComponentRdd:VertexRDD[(String,VertexId)]=graph.vertices.innerJoin(connectedComponents.vertices){ case (vertexId, hashTag, componentId)=> (hashTag, componentId) }
Now that we have componentId
and hashTag
, let's filter only the hashTags
for the top component ID:
val topComponentHashTags=hashtagComponentRdd .filter{ case (vertexId, (hashTag, componentId)) => (componentId==topComponent._1)} .map{case (vertexId, (hashTag,componentId)) => hashTag } topComponentHashTags
The entire method looks like this:
def getHashTagsOfTopConnectedComponent(graph:Graph[String,String]):RDD[String]={ //Get all the connected components val connectedComponents=graph.connectedComponents.cache() import scala.collection._ val ccCounts:Map[VertexId, Long]=connectedComponents.vertices.map{case (_, vertexId) => vertexId}.countByValue //Get the top component Id and count val topComponent:(VertexId, Long)=ccCounts.toSeq.sortBy{case (componentId, count) => count}.reverse.head //RDD of HashTag-Component Id pair. Joins using vertexId val hashtagComponentRdd:VertexRDD[(String,VertexId)]=graph.vertices.innerJoin(connectedComponents.vertices){ case (vertexId, hashTag, componentId)=> (hashTag, componentId) } //Filter the vertices that belong to the top component alone val topComponentHashTags=hashtagComponentRdd .filter{ case (vertexId, (hashTag, componentId)) => (componentId==topComponent._1)} .map{case (vertexId, (hashTag,componentId)) => hashTag } topComponentHashTags }
hashTags
to a file is as simple as calling saveAsTextFile
. The repartition(1)
is done just so that we have a single output file. Alternatively, you could use collect()
to bring all the data to the driver and inspect it:def saveTopTags(topTags:RDD[String]){ topTags.repartition(1).saveAsTextFile("topTags.txt") }
The number of hashtags in the top connected component for our run was 7,320. This shows that in our sample stream there are about 7,320 tags related to fashion that are interrelated. They could be synonyms, closely related, or remotely related to fashion. A snapshot of the file looks like this:
In this chapter, we briefly touched upon Spark streaming, Streaming ML, and GraphX. Please note that this is by no means an exhaustive recipe list for both topics and aims to just provide a taste of what Streaming and GraphX in Spark could do.