Even though MLlib is designed with RDDs and DStreams in focus, for ease of transforming the data we will read the data and convert it to a DataFrame.
The DStreams are the basic data abstraction for Spark Streaming (see http://bit.ly/2jIDT2A)
Just like in the previous chapter, we first specify the schema of our dataset.
Note that here (for brevity), we only present a handful of features. You should always check our GitHub account for this book for the latest version of the code: https://github.com/drabastomek/learningPySpark.
import pyspark.sql.types as typ labels = [ ('INFANT_ALIVE_AT_REPORT', typ.StringType()), ('BIRTH_YEAR', typ.IntegerType()), ('BIRTH_MONTH', typ.IntegerType()), ('BIRTH_PLACE', typ.StringType()), ('MOTHER_AGE_YEARS', typ.IntegerType()), ('MOTHER_RACE_6CODE', typ.StringType()), ('MOTHER_EDUCATION', typ.StringType()), ('FATHER_COMBINED_AGE', typ.IntegerType()), ('FATHER_EDUCATION', typ.StringType()), ('MONTH_PRECARE_RECODE', typ.StringType()), ... ('INFANT_BREASTFED', typ.StringType()) ] schema = typ.StructType([ typ.StructField(e[0], e[1], False) for e in labels ])
Next, we load the data. The .read.csv(...)
method can read either uncompressed or (as in our case) GZipped comma-separated values. The header
parameter set to True
indicates that the first row contains the header, and we use the schema
to specify the correct data types:
births = spark.read.csv('births_train.csv.gz', header=True, schema=schema)
There are plenty of features in our dataset that are strings. These are mostly categorical variables that we need to somehow convert to a numeric form.
You can glimpse over the original file schema specification here: ftp://ftp.cdc.gov/pub/Health_Statistics/NCHS/Dataset_Documentation/DVS/natality/UserGuide2015.pdf.
We will first specify our recode dictionary:
recode_dictionary = { 'YNU': { 'Y': 1, 'N': 0, 'U': 0 } }
Our goal in this chapter is to predict whether the 'INFANT_ALIVE_AT_REPORT'
is either 1
or 0
. Thus, we will drop all of the features that relate to the infant and will try to predict the infant's chances of surviving only based on the features related to its mother, father, and the place of birth:
selected_features = [ 'INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM' ] births_trimmed = births.select(selected_features)
In our dataset, there are plenty of features with Yes/No/Unknown values; we will only code Yes
to 1
; everything else will be set to 0
.
There is also a small problem with how the number of cigarettes smoked by the mother was coded: as 0 means the mother smoked no cigarettes before or during the pregnancy, between 1-97 states the actual number of cigarette smoked, 98 indicates either 98 or more, whereas 99 identifies the unknown; we will assume the unknown is 0 and recode accordingly.
So next we will specify our recoding methods:
import pyspark.sql.functions as func def recode(col, key): return recode_dictionary[key][col] def correct_cig(feat): return func .when(func.col(feat) != 99, func.col(feat)) .otherwise(0) rec_integer = func.udf(recode, typ.IntegerType())
The recode
method looks up the correct key from the recode_dictionary
(given the key
) and returns the corrected value. The correct_cig
method checks when the value of the feature feat
is not equal to 99 and (for that situation) returns the value of the feature; if the value is equal to 99, we get 0 otherwise.
We cannot use the recode
function directly on a DataFrame
; it needs to be converted to a UDF that Spark will understand. The rec_integer
is such a function: by passing our specified recode
function and specifying the return value data type, we can use it then to encode our Yes/No/Unknown features.
So, let's get to it. First, we'll correct the features related to the number of cigarettes smoked:
births_transformed = births_trimmed .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE')) .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI')) .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI')) .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))
The .withColumn(...)
method takes the name of the column as its first parameter and the transformation as the second one. In the previous cases, we do not create new columns, but reuse the same ones instead.
Now we will focus on correcting the Yes/No/Unknown features. First, we will figure out which these are with the following snippet:
cols = [(col.name, col.dataType) for col in births_trimmed.schema] YNU_cols = [] for i, s in enumerate(cols): if s[1] == typ.StringType(): dis = births.select(s[0]) .distinct() .rdd .map(lambda row: row[0]) .collect() if 'Y' in dis: YNU_cols.append(s[0])
First, we created a list of tuples (cols
) that hold column names and corresponding data types. Next, we loop through all of these and calculate distinct values of all string columns; if a 'Y'
is within the returned list, we append the column name to the YNU_cols
list.
DataFrames can transform the features in bulk while selecting features. To present the idea, consider the following example:
births.select([ 'INFANT_NICU_ADMISSION', rec_integer( 'INFANT_NICU_ADMISSION', func.lit('YNU') ) .alias('INFANT_NICU_ADMISSION_RECODE')] ).take(5)
Here's what we get in return:
We select the 'INFANT_NICU_ADMISSION'
column and we pass the name of the feature to the rec_integer
method. We also alias the newly transformed column as 'INFANT_NICU_ADMISSION_RECODE'
. This way we will also confirm that our UDF works as intended.
So, to transform all the YNU_cols
in one go, we will create a list of such transformations, as shown here:
exprs_YNU = [ rec_integer(x, func.lit('YNU')).alias(x) if x in YNU_cols else x for x in births_transformed.columns ] births_transformed = births_transformed.select(exprs_YNU)
Let's check if we got it correctly:
births_transformed.select(YNU_cols[-5:]).show(5)
Here's what we get:
Looks like everything worked as we wanted it to work, so let's get to know our data better.