Let's analyze flight data by representing the airports as vertices and routes as edges. Let's do some basic graph analytics to find out departures and arrivals and also analyze the data with the Pregel API to find out the cheapest fares. Download the flight data from the following location:
http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time
The steps to analyze the data are as follows:
unzip 355968671_T_ONTIME.zip hadoop fs -put 355968671_T_ONTIME.csv
spark-shell
command and then import all dependencies, as follows:scala> import org.apache.spark.graphx._ scala> import org.apache.spark.rdd.RDD
case
class for the flight schema corresponding to the CSV data file:scala> case class Flight(org_id:Long, origin:String, dest_id:Long, dest:String, dist:Float)
Flight
class:scala> def parseFlightCsv(str: String): Flight = { val line = str.split(",") Flight(line(0).toLong, line(1), line(2).toLong, line(3), line(4).toFloat) }
csvRDD
by reading the input file and then remove the header:scala> val csvRDD = sc.textFile("355968671_T_ONTIME.csv") scala> val noHdrRDD = csvRDD.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } scala> val flightsRDD = noHdrRDD.map(parseFlightCsv)
nowhere
. Map the airport ID to the three-letter code for mapping later:scala> val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct scala> airports.take(3) res26: Array[(Long, String)] = Array((14122,"PIT"), (10141,"ABR"), (13158,"MAF")) scala> val default = "nowhere" scala> val airportMap = airports.map { case ((org_id), name) => (org_id -> name) }.collect.toList.toMap
scala> val flightRoutes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinct scala> val edges = flightRoutes.map { case ((org_id, dest_id), distance) =>Edge(org_id.toLong, dest_id.toLong, distance) } scala> edges.take(1) // Array(Edge(10299,10926,160))
scala> val graph = Graph(airports, edges, default)
scala> val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3) scala> maxIncoming.foreach(println) (ATL,152) (ORD,145) (DFW,143)
scala> val maxout= graph.outDegrees.join(airports).sortBy(_._2._1, ascending=false).take(3) scala> maxout.foreach(println) (10397,(153,ATL)) (13930,(146,ORD)) (11298,(143,DFW))
scala> graph.triplets.sortBy(_.attr, ascending=false).map(triplet => "There were " + triplet.attr.toInt + " flights from " + triplet.srcAttr + " to " + triplet.dstAttr + ".").take(3) .foreach(println) There were 4983 flights from "JFK" to "HNL". There were 4983 flights from "HNL" to "JFK". There were 4962 flights from "EWR" to "HNL".
Many important graph algorithms are iterative algorithms, since the properties of vertices depend on the properties of their neighbors, which depend on the properties of their neighbors. Pregel is an iterative graph processing model, developed at Google, which uses a sequence of iterations of message passing between vertices in a graph. GraphX implements a Pregel-like bulk-synchronous message-passing API.
With the Pregel implementation in GraphX, vertices can only send messages to neighboring vertices.
The Pregel operator is executed in a series of super steps. In each super step:
When there are no more messages remaining, the Pregel operator will end the iteration and the final graph is returned. The following code computes the cheapest airfare using the Pregel API:
scala> val sourceId: VertexId = 13024 // starting vertex
A graph with edges containing an airfare cost calculation as 50 + distance / 20 is as follows:
scala> val gg = graph.mapEdges(e => 50.toDouble + e.attr.toDouble/20 )
Initialize the graph; all vertices except source have the distance as infinity:
scala> val initialGraph = gg.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
Now, call pregel
on the graph:
scala> val sssp = initialGraph.pregel(Double.PositiveInfinity)( // Vertex Program (id, dist, newDist) => math.min(dist, newDist), triplet => { // Send Message if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, // Merge Message (a,b) => math.min(a,b) )
Now, print the routes with the lowest flight cost:
scala> println(sssp.edges.take(4).mkString(" ")) Edge(10135,10397,84.6) Edge(10135,13930,82.7) Edge(10140,10397,113.45) Edge(10140,10821,133.5)
Find the routes with airport codes and the lowest flight cost:
scala> sssp.edges.map{ case ( Edge(org_id, dest_id,price))=> ( (airportMap(org_id), airportMap(dest_id), price)) }.takeOrdered(10)(Ordering.by(_._3)) Array((WRG,PSG,51.55), (PSG,WRG,51.55), (CEC,ACV,52.8), (ACV,CEC,52.8), (ORD,MKE,53.35), (IMT,RHI,53.35), (MKE,ORD,53.35), (RHI,IMT,53.35), (STT,SJU,53.4), (SJU,STT,53.4))
Find airports with the lowest flight cost:
scala> println(sssp.vertices.take(4).mkString(" ")) (10208,277.79) (10268,260.7) (14828,261.65) (14698,125.25)
Find airport codes sorted by the lowest flight cost:
scala> sssp.vertices.collect.map(x => (airportMap(x._1), x._2)).sortWith(_._2 < _._2) res21: Array[(String, Double)] = Array(PDX,62.05), (SFO,65.75), (EUG,117.35)