Chapter 4. Prepare Data for Modeling

All data is dirty, irrespective of what the source of the data might lead you to believe: it might be your colleague, a telemetry system that monitors your environment, a dataset you download from the web, or some other source. Until you have tested and proven to yourself that your data is in a clean state (we will get to what clean state means in a second), you should neither trust it nor use it for modeling.

Your data can be stained with duplicates, missing observations and outliers, non-existent addresses, wrong phone numbers and area codes, inaccurate geographical coordinates, wrong dates, incorrect labels, mixtures of upper and lower cases, trailing spaces, and many other more subtle problems. It is your job to clean it, irrespective of whether you are a data scientist or data engineer, so you can build a statistical or machine learning model.

Your dataset is considered technically clean if none of the aforementioned problems can be found. However, to clean the dataset for modeling purposes, you also need to check the distributions of your features and confirm they fit the predefined criteria.

As a data scientist, you can expect to spend 80-90% of your time massaging your data and getting familiar with all the features. This chapter will guide you through that process, leveraging Spark capabilities.

In this chapter, you will learn how to do the following:

  • Recognize and handle duplicates, missing observations, and outliers
  • Calculate descriptive statistics and correlations
  • Visualize your data with matplotlib and Bokeh

Checking for duplicates, missing observations, and outliers

Until you have fully tested the data and proven it worthy of your time, you should neither trust it nor use it. In this section, we will show you how to deal with duplicates, missing observations, and outliers.

Duplicates

Duplicates are observations that appear as distinct rows in your dataset, but which, upon closer inspection, look the same. That is, if you looked at them side by side, all the features in these two (or more) rows would have exactly the same values.

On the other hand, if your data has some form of an ID to distinguish between records (or associate them with certain users, for example), then what might initially appear as a duplicate may not be; sometimes systems fail and produce erroneous IDs. In such a situation, you need to either check whether the same ID is a real duplicate, or you need to come up with a new ID system.

Consider the following example:

df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])

As you can see, we have several issues here:

  • We have two rows with IDs equal to 3 and they are exactly the same
  • Rows with IDs 1 and 4 are the same — the only thing that's different is their IDs, so we can safely assume that they are the same person
  • We have two rows with IDs equal to 5, but that seems to be a recording issue, as they do not seem to be the same person

This is a very easy dataset with only seven rows. What do you do when you have millions of observations? The first thing I normally do is to check if I have any duplicates: I compare the counts of the full dataset with the one that I get after running a .distinct() method:

print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))

Here's what you get back for our DataFrame:

Duplicates

If these two numbers differ, then you know you have, what I like to call, pure duplicates: rows that are exact copies of each other. We can drop these rows by using the .dropDuplicates(...) method:

df = df.dropDuplicates()

Our dataset will then look as follows (once you run df.show()):

Duplicates

We dropped one of the rows with ID 3. Now let's check whether there are any duplicates in the data irrespective of ID. We can quickly repeat what we have done earlier, but using only columns other than the ID column:

print('Count of ids: {0}'.format(df.count()))
print('Count of distinct ids: {0}'.format(
    df.select([
        c for c in df.columns if c != 'id'
    ]).distinct().count())
)

We should see one more row that is a duplicate:

Duplicates

We can still use the .dropDuplicates(...), but will add the subset parameter that specifies only the columns other than the id column:

df = df.dropDuplicates(subset=[
    c for c in df.columns if c != 'id'
])

The subset parameter instructs the .dropDuplicates(...) method to look for duplicated rows using only the columns specified via the subset parameter; in the preceding example, we will drop the duplicated records with the same weight, height, age, and gender but not id. Running the df.show(), we get the following cleaner dataset as we dropped the row with id = 1 since it was identical to the record with id = 4:

Duplicates

Now that we know there are no full rows duplicated, or any identical rows differing only by ID, let's check if there are any duplicated IDs. To calculate the total and distinct number of IDs in one step, we can use the .agg(...) method:

import pyspark.sql.functions as fn

df.agg(
    fn.count('id').alias('count'),
    fn.countDistinct('id').alias('distinct')
).show()

Here's the output of the preceding code:

Duplicates

In the previous example, we first import all the functions from the pyspark.sql module.

Tip

This gives us access to a vast array of various functions, too many to list here. However, we strongly encourage you to study the PySpark's documentation at http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#module-pyspark.sql.functions.

Next, we use the .count(...) and .countDistinct(...) to, respectively, calculate the number of rows and the number of distinct ids in our DataFrame. The .alias(...) method allows us to specify a friendly name to the returned column.

As you can see, we have five rows in total, but only four distinct IDs. Since we have already dropped all the duplicates, we can safely assume that this might just be a fluke in our ID data, so we will give each row a unique ID:

df.withColumn('new_id', fn.monotonically_increasing_id()).show()

The preceding code snippet produced the following output:

Duplicates

The .monotonicallymonotonically_increasing_id() method gives each record a unique and increasing ID. According to the documentation, as long as your data is put into less than roughly 1 billion partitions with less than 8 billions records in each, the ID is guaranteed to be unique.

Note

A word of caution: in earlier versions of Spark the .monotonicallymonotonically_increasing_id() method would not necessarily return the same IDs across multiple evaluations of the same DataFrame. This, however, has been fixed in Spark 2.0.

Missing observations

You will frequently encounter datasets with blanks in them. The missing values can happen for a variety of reasons: systems failure, people error, data schema changes, just to name a few.

The simplest way to deal with missing values, if your data can afford it, is to drop the whole observation when any missing value is found. You have to be careful not to drop too many: depending on the distribution of the missing values across your dataset it might severely affect the usability of your dataset. If, after dropping the rows, I end up with a very small dataset, or find that the reduction in data size is more than 50%, I start checking my data to see what features have the most holes in them and perhaps exclude those altogether; if a feature has most of its values missing (unless a missing value bears a meaning), from a modeling point of view, it is fairly useless.

The other way to deal with the observations with missing values is to impute some value in place of those Nones. Given the type of your data, you have several options to choose from:

  • If your data is a discrete Boolean, you can turn it into a categorical variable by adding a third category — Missing
  • If your data is already categorical, you can simply extend the number of levels and add the Missing category as well
  • If you're dealing with ordinal or numerical data, you can impute either mean, median, or some other predefined value (for example, first or third quartile, depending on the distribution shape of your data)

Consider a similar example to the one we presented previously:

df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

In our example, we deal with a number of missing values categories.

Analyzing rows, we see the following:

  • The row with ID 3 has only one useful piece of information—the height
  • The row with ID 6 has only one missing value—the age

Analyzing columns, we can see the following:

  • The income column, since it is a very personal thing to disclose, has most of its values missing
  • The weight and gender columns have only one missing value each
  • The age column has two missing values

To find the number of missing observations per row, we can use the following snippet:

df_miss.rdd.map(
    lambda row: (row['id'], sum([c == None for c in row]))
).collect()

It produces the following output:

Missing observations

It tells us that, for example, the row with ID 3 has four missing observations, as we observed earlier.

Let's see what values are missing so that when we count missing observations in columns, we can decide whether to drop the observation altogether or impute some of the observations:

df_miss.where('id == 3').show()

Here's what we get:

Missing observations

Let's now check what percentage of missing observations are there in each column:

df_miss.agg(*[
    (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
    for c in df_miss.columns
]).show()

This generates the following output:

Missing observations

Note

The * argument to the .count(...) method (in place of a column name) instructs the method to count all rows. On the other hand, the * preceding the list declaration instructs the .agg(...) method to treat the list as a set of separate parameters passed to the function.

So, we have 14% of missing observations in the weight and gender columns, twice as much in the height column, and almost 72% of missing observations in the income column. Now we know what to do.

First, we will drop the 'income' feature, as most of its values are missing.

df_miss_no_income = df_miss.select([
    c for c in df_miss.columns if c != 'income'
])

We now see that we do not need to drop the row with ID 3 as the coverage in the 'weight' and 'age' columns has enough observations (in our simplified example) to calculate the mean and impute it in the place of the missing values.

However, if you decide to drop the observations instead, you can use the .dropna(...) method, as shown here. Here, we will also use the thresh parameter, which allows us to specify a threshold on the number of missing observations per row that would qualify the row to be dropped. This is useful if you have a dataset with tens or hundreds of features and you only want to drop those rows that exceed a certain threshold of missing values:

df_miss_no_income.dropna(thresh=3).show()

The preceding code produces the following output:

Missing observations

On the other hand, if you wanted to impute the observations, you can use the .fillna(...) method. This method accepts a single integer (long is also accepted), float, or string; all missing values in the whole dataset will then be filled in with that value. You can also pass a dictionary of a form {'<colName>': <value_to_impute>}. This has the same limitation, in that, as the <value_to_impute>, you can only pass an integer, float, or string.

If you want to impute a mean, median, or other calculated value, you need to first calculate the value, create a dictionary with such values, and then pass it to the .fillna(...) method.

Here's how we do it:

means = df_miss_no_income.agg(
    *[fn.mean(c).alias(c) 
        for c in df_miss_no_income.columns if c != 'gender']
).toPandas().to_dict('records')[0]

means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

The preceding code will produce the following output:

Missing observations

We omit the gender column as one cannot calculate a mean of a categorical variable, obviously.

We use a double conversion here. Taking the output of the .agg(...) method (a PySpark DataFrame), we first convert it into a pandas' DataFrame and then once more to a dictionary.

Tip

Note that calling the .toPandas() can be problematic, as the method works essentially in the same way as .collect() in RDDs. It collects all the information from the workers and brings it over to the driver. It is unlikely to be a problem with the preceding dataset, unless you have thousands upon thousands of features.

The records parameter to the .to_dict(...) method of pandas instructs it to create the following dictionary:

Missing observations

Since we cannot calculate the average (or any other numeric metric of a categorical variable), we added the missing category to the dictionary for the gender feature. Note that, even though the mean of the age column is 40.40, when imputed, the type of the df_miss_no_income.age column was preserved—it is still an integer.

Outliers

Outliers are those observations that deviate significantly from the distribution of the rest of your sample. The definitions of significance vary, but in the most general form, you can accept that there are no outliers if all the values are roughly within the Q1−1.5IQR and Q3+1.5IQR range, where IQR is the interquartile range; the IQR is defined as a difference between the upper- and lower-quartiles, that is, the 75th percentile (the Q3) and 25th percentile (the Q1), respectively.

Let's, again, consider a simple example:

df_outliers = spark.createDataFrame([
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ], ['id', 'weight', 'height', 'age'])

Now we can use the definition we outlined previously to flag the outliers.

First, we calculate the lower and upper cut off points for each feature. We will use the .approxQuantile(...) method. The first parameter specified is the name of the column, the second parameter can be either a number between 0 or 1 (where 0.5 means to calculated median) or a list (as in our case), and the third parameter specifies the acceptable level of an error for each metric (if set to 0, it will calculate an exact value for the metric, but it can be really expensive to do so):

cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
    quantiles = df_outliers.approxQuantile(
        col, [0.25, 0.75], 0.05
    )
    
    IQR = quantiles[1] - quantiles[0]
    
    bounds[col] = [
        quantiles[0] - 1.5 * IQR, 
        quantiles[1] + 1.5 * IQR
]

The bounds dictionary holds the lower and upper bounds for each feature:

Outliers

Let's now use it to flag our outliers:

outliers = df_outliers.select(*['id'] + [
    (
        (df_outliers[c] < bounds[c][0]) | 
        (df_outliers[c] > bounds[c][1])
    ).alias(c + '_o') for c in cols
])
outliers.show()

The preceding code produces the following output:

Outliers

We have two outliers in the weight feature and two in the age feature. By now you should know how to extract these, but here is a snippet that lists the values significantly differing from the rest of the distribution:

df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
df_outliers.filter('age_o').select('id', 'age').show()

The preceding code will give you the following output:

Outliers

Equipped with the methods described in this section, you can quickly clean up even the biggest of datasets.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset