- Load the spark-shell:
$ spark-shell
- Import the graphx related classes:
scala> import org.apache.spark.graphx._
- Load the edges from hdfs:
scala> val edgesFile = sc.textFile(
"hdfs://localhost:9000/user/hduser/data/cc/edges.csv")
- Convert the edgesFile RDD into the RDD of edges:
scala> val edges = edgesFile.map(_.split(",")).map(e =>
Edge(e(0).toLong,e(1).toLong,e(2)))
- Load the vertices from HDFS:
scala> val verticesFile = sc.textFile(
"hdfs://localhost:9000/user/hduser/data/cc/nodes.csv")
- Map the vertices:
scala> val vertices = verticesFile.map(_.split(",")).map( e =>
(e(0).toLong,e(1)))
- Create the graph object:
scala> val graph = Graph(vertices,edges)
- Calculate the connected components using graph.connectedComponents:
scala> val cc = graph.connectedComponents
- Find the vertices for the connected components (which is a subgraph):
scala> val ccVertices = cc.vertices
- Print the ccVertices:
scala> ccVertices.collect.foreach(println)
(4,4)
(6,4)
(2,1)
(1,1)
(3,1)
(5,4)
As you can see in the output, vertices 1, 2, and 3 are pointing to 1, while 4, 5, and 6 are pointing to 4. Both of these are the lowest-indexed vertices in their respective clusters.