How to do it...

To get started with the recipe, we first need to perform the following steps: 

  1. Start spark-shell with the graphframes package:
        $ spark-shell --packages graphframes:graphframes:0.2.0-
spark2.0-s_2.11
If you are getting an error downloading slf4j, please use following command to download it explicitly:
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.4:get -DartifactId=slf4j-api -DgroupId=org.slf4j -Dversion=1.7.7
  1. Do the necessary imports:
        scala> import org.graphframes._
  1. Load the vertex DataFrame:
        scala> val vertices = spark.sqlContext.createDataFrame
(List(("sc","Santa Clara","CA"),("fr","Fremont","CA"),
("sf","San Francisco","CA"))).toDF("id","city","state")
  1. Load the edge DataFrame:
        scala> val edges = spark.sqlContext.createDataFrame
(List(("sc","fr",20),("fr","sf",44),
("sf","sc",53))).toDF("src","dst","distance")
  1. Create a GraphFrame:
        scala> val g = GraphFrame(vertices, edges)
  1. Show all vertices:
        scala> g.vertices.show
  1. Show all edges:
        scala> g.edges.show
  1. Get in-degrees of each vertex:
        scala> g.inDegrees.show

Now that we have played with the toy dataset, let's do some exercise on a real dataset. We are going to use the California road network dataset as part of the Stanford Network Analysis Project (SNAP) dataset at Stanford (http://snap.stanford.edu/data):

  1. Start spark-shell with the graphframes package:
        $ spark-shell --packages graphframes:graphframes:0.2.0-
spark2.0-s_2.11
  1. Do the necessary imports:
        scala> import org.graphframes._
  1. Let's load the road data (tab separated):
        scala> val edges = spark.read.option("delimiter","	").option
("header","true").option("inferschema","true")
.csv("s3a://sparkcookbook/roads/ca")
  1. Data is in the format of (intersection/endpoint intersection/endpoint) , so to create a vertex DataFrame, we need to extract the maximum value of vertex from edges:
        scala> val vertices = edges.select("src").union
(edges.select("dst")).distinct.toDF("id")
  1. Create a GraphFrame:
        scala> val g = GraphFrame(vertices, edges)
  1. We think that in California, all roads are connected to each other. Let's see how many connected components there are (it is recommended to be run on DataBricks Cloud or EMR as it would need some heavy duty computations):
        scala> val cc = g.connectedComponents.run
  1. Let's print the cc schema:
        scala> cc.printSchema
root
|-- id: integer (nullable = true)
|-- component: long (nullable = true)
  1. We are interested in knowing how many connected components there are. So,  let's select the component column and do the count:
        scala> cc.select("component").distinct.count
(1) Spark Jobs
res7: Long = 2638

 So, the value comes to 2638. So in California, every road is not connected with each other, unlike what we would expect. It looks like freeways are not added to this list, and that explains the 2638 clusters. 

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset