Preparing your flights dataset

For this flights sample scenario, we will make use of two sets of data:

  • Airline On-Time Performance and Causes of Flight Delays: [http://bit.ly/2ccJPPM] This dataset contains scheduled and actual departure and arrival times, and delay causes as reported by US air carriers. The data is collected by the Office of Airline Information, Bureau of Transportation Statistics (BTS).
  • Open Flights: Airports and airline data: [http://openflights.org/data.html] This dataset contains the list of US airport data including the IATA code, airport name, and airport location.

We will create two DataFrames – airports and departureDelays–which will make up our vertices and edges of our GraphFrame, respectively. We will be creating this flights sample application using Python.

As we are using a Databricks notebook for our example, we can make use of the /databricks-datasets/location, which contains numerous sample datasets. You can also download the data from:

In this example, we are creating two variables denoting the file paths for our Airports and Departure Delays data, respectively. Then we will load these datasets and create the respective Spark DataFrames; note for both of these files, we can easily infer the schema:

# Set File Paths
tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

# Obtain airports dataset
# Note, this dataset is tab-delimited with a header
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='	')
airportsna.createOrReplaceTempView("airports_na")

# Obtain departure Delays data
# Note, this dataset is comma-delimited with a header
departureDelays = spark.read.csv(tripdelaysFilePath, header='true')
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()

Once we loaded the departureDelays DataFrame, we also cache it so we can include some additional filtering of the data in a performant manner:

# Available IATA codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

The preceding query allows us to build a distinct list with origin city IATA codes (for example, Seattle = 'SEA', San Francisco = 'SFO', New York JFK = 'JFK', and so on). Next, we only include airports that had a trip occur within the departureDelays DataFrame:

# Only include airports with atleast one trip from the 
# `departureDelays` dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

By building the distinct list of origin airport codes, we can build the airports DataFrame to contain only the airport codes that exist in the departureDelays dataset. The following code snippet generates a new DataFrame (departureDelays_geo) that is comprised of key attributes including date of flight, delays, distance, and airport information (origin, destination):

# Build `departureDelays_geo` DataFrame
# Obtain key attributes such as Date of flight, delays, distance, 
# and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ''), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()

To take a quick peek into this data, you can run the show method as shown here:

# Review the top 10 rows of the `departureDelays_geo` DataFrame
departureDelays_geo.show(10)
Preparing your flights dataset
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset