Loading and transforming the data

Even though MLlib is designed with RDDs and DStreams in focus, for ease of transforming the data we will read the data and convert it to a DataFrame.

Note

The DStreams are the basic data abstraction for Spark Streaming (see http://bit.ly/2jIDT2A)

Just like in the previous chapter, we first specify the schema of our dataset.

Note

Note that here (for brevity), we only present a handful of features. You should always check our GitHub account for this book for the latest version of the code: https://github.com/drabastomek/learningPySpark.

Here's the code:

import pyspark.sql.types as typ
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ...
    ('INFANT_BREASTFED', typ.StringType())
]
schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
    ])

Next, we load the data. The .read.csv(...) method can read either uncompressed or (as in our case) GZipped comma-separated values. The header parameter set to True indicates that the first row contains the header, and we use the schema to specify the correct data types:

births = spark.read.csv('births_train.csv.gz', 
                        header=True, 
                        schema=schema)

There are plenty of features in our dataset that are strings. These are mostly categorical variables that we need to somehow convert to a numeric form.

Tip

You can glimpse over the original file schema specification here: ftp://ftp.cdc.gov/pub/Health_Statistics/NCHS/Dataset_Documentation/DVS/natality/UserGuide2015.pdf.

We will first specify our recode dictionary:

recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
    }
}

Our goal in this chapter is to predict whether the 'INFANT_ALIVE_AT_REPORT' is either 1 or 0. Thus, we will drop all of the features that relate to the infant and will try to predict the infant's chances of surviving only based on the features related to its mother, father, and the place of birth:

selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]
births_trimmed = births.select(selected_features)

In our dataset, there are plenty of features with Yes/No/Unknown values; we will only code Yes to 1; everything else will be set to 0.

There is also a small problem with how the number of cigarettes smoked by the mother was coded: as 0 means the mother smoked no cigarettes before or during the pregnancy, between 1-97 states the actual number of cigarette smoked, 98 indicates either 98 or more, whereas 99 identifies the unknown; we will assume the unknown is 0 and recode accordingly.

So next we will specify our recoding methods:

import pyspark.sql.functions as func
def recode(col, key):
    return recode_dictionary[key][col] 
def correct_cig(feat):
    return func 
        .when(func.col(feat) != 99, func.col(feat))
        .otherwise(0)
rec_integer = func.udf(recode, typ.IntegerType())

The recode method looks up the correct key from the recode_dictionary (given the key) and returns the corrected value. The correct_cig method checks when the value of the feature feat is not equal to 99 and (for that situation) returns the value of the feature; if the value is equal to 99, we get 0 otherwise.

We cannot use the recode function directly on a DataFrame; it needs to be converted to a UDF that Spark will understand. The rec_integer is such a function: by passing our specified recode function and specifying the return value data type, we can use it then to encode our Yes/No/Unknown features.

So, let's get to it. First, we'll correct the features related to the number of cigarettes smoked:

births_transformed = births_trimmed 
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

The .withColumn(...) method takes the name of the column as its first parameter and the transformation as the second one. In the previous cases, we do not create new columns, but reuse the same ones instead.

Now we will focus on correcting the Yes/No/Unknown features. First, we will figure out which these are with the following snippet:

cols = [(col.name, col.dataType) for col in births_trimmed.schema]
YNU_cols = []
for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        dis = births.select(s[0]) 
            .distinct() 
            .rdd 
            .map(lambda row: row[0]) 
            .collect() 
        if 'Y' in dis:
            YNU_cols.append(s[0])

First, we created a list of tuples (cols) that hold column names and corresponding data types. Next, we loop through all of these and calculate distinct values of all string columns; if a 'Y' is within the returned list, we append the column name to the YNU_cols list.

DataFrames can transform the features in bulk while selecting features. To present the idea, consider the following example:

births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')
        ) 
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

Here's what we get in return:

Loading and transforming the data

We select the 'INFANT_NICU_ADMISSION' column and we pass the name of the feature to the rec_integer method. We also alias the newly transformed column as 'INFANT_NICU_ADMISSION_RECODE'. This way we will also confirm that our UDF works as intended.

So, to transform all the YNU_cols in one go, we will create a list of such transformations, as shown here:

exprs_YNU = [
    rec_integer(x, func.lit('YNU')).alias(x) 
    if x in YNU_cols 
    else x 
    for x in births_transformed.columns
]
births_transformed = births_transformed.select(exprs_YNU)

Let's check if we got it correctly:

births_transformed.select(YNU_cols[-5:]).show(5)

Here's what we get:

Loading and transforming the data

Looks like everything worked as we wanted it to work, so let's get to know our data better.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset