For this flights sample scenario, we will make use of two sets of data:
We will create two DataFrames – airports
and departureDelays
–which will make up our vertices and edges of our GraphFrame, respectively. We will be creating this flights sample application using Python.
As we are using a Databricks notebook for our example, we can make use of the /databricks-datasets/
location, which contains numerous sample datasets. You can also download the data from:
depa
rtureDelays.csv
: http://bit.ly/2ejPr8kairportCodes
: http://bit.ly/2ePAdKTIn this example, we are creating two variables denoting the file paths for our Airports and Departure Delays data, respectively. Then we will load these datasets and create the respective Spark DataFrames; note for both of these files, we can easily infer the schema:
# Set File Paths tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv" airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt" # Obtain airports dataset # Note, this dataset is tab-delimited with a header airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep=' ') airportsna.createOrReplaceTempView("airports_na") # Obtain departure Delays data # Note, this dataset is comma-delimited with a header departureDelays = spark.read.csv(tripdelaysFilePath, header='true') departureDelays.createOrReplaceTempView("departureDelays") departureDelays.cache()
Once we loaded the departureDelays
DataFrame, we also cache it so we can include some additional filtering of the data in a performant manner:
# Available IATA codes from the departuredelays sample dataset tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a") tripIATA.createOrReplaceTempView("tripIATA")
The preceding query allows us to build a distinct list with origin city IATA codes (for example, Seattle = 'SEA', San Francisco = 'SFO', New York JFK = 'JFK'
, and so on). Next, we only include airports that had a trip occur within the departureDelays
DataFrame:
# Only include airports with atleast one trip from the # `departureDelays` dataset airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA") airports.createOrReplaceTempView("airports") airports.cache()
By building the distinct list of origin airport codes, we can build the airports DataFrame to contain only the airport codes that exist in the departureDelays
dataset. The following code snippet generates a new DataFrame (departureDelays_geo
) that is comprised of key attributes including date of flight, delays, distance, and airport information (origin, destination):
# Build `departureDelays_geo` DataFrame # Obtain key attributes such as Date of flight, delays, distance, # and airport information (Origin, Destination) departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ''), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") # Create Temporary View and cache departureDelays_geo.createOrReplaceTempView("departureDelays_geo") departureDelays_geo.cache()
To take a quick peek into this data, you can run the show
method as shown here:
# Review the top 10 rows of the `departureDelays_geo` DataFrame departureDelays_geo.show(10)