All data is dirty, irrespective of what the source of the data might lead you to believe: it might be your colleague, a telemetry system that monitors your environment, a dataset you download from the web, or some other source. Until you have tested and proven to yourself that your data is in a clean state (we will get to what clean state means in a second), you should neither trust it nor use it for modeling.
Your data can be stained with duplicates, missing observations and outliers, non-existent addresses, wrong phone numbers and area codes, inaccurate geographical coordinates, wrong dates, incorrect labels, mixtures of upper and lower cases, trailing spaces, and many other more subtle problems. It is your job to clean it, irrespective of whether you are a data scientist or data engineer, so you can build a statistical or machine learning model.
Your dataset is considered technically clean if none of the aforementioned problems can be found. However, to clean the dataset for modeling purposes, you also need to check the distributions of your features and confirm they fit the predefined criteria.
As a data scientist, you can expect to spend 80-90% of your time massaging your data and getting familiar with all the features. This chapter will guide you through that process, leveraging Spark capabilities.
In this chapter, you will learn how to do the following:
Until you have fully tested the data and proven it worthy of your time, you should neither trust it nor use it. In this section, we will show you how to deal with duplicates, missing observations, and outliers.
Duplicates are observations that appear as distinct rows in your dataset, but which, upon closer inspection, look the same. That is, if you looked at them side by side, all the features in these two (or more) rows would have exactly the same values.
On the other hand, if your data has some form of an ID to distinguish between records (or associate them with certain users, for example), then what might initially appear as a duplicate may not be; sometimes systems fail and produce erroneous IDs. In such a situation, you need to either check whether the same ID is a real duplicate, or you need to come up with a new ID system.
Consider the following example:
df = spark.createDataFrame([ (1, 144.5, 5.9, 33, 'M'), (2, 167.2, 5.4, 45, 'M'), (3, 124.1, 5.2, 23, 'F'), (4, 144.5, 5.9, 33, 'M'), (5, 133.2, 5.7, 54, 'F'), (3, 124.1, 5.2, 23, 'F'), (5, 129.2, 5.3, 42, 'M'), ], ['id', 'weight', 'height', 'age', 'gender'])
As you can see, we have several issues here:
3
and they are exactly the same1
and 4
are the same — the only thing that's different is their IDs, so we can safely assume that they are the same person5
, but that seems to be a recording issue, as they do not seem to be the same personThis is a very easy dataset with only seven rows. What do you do when you have millions of observations? The first thing I normally do is to check if I have any duplicates: I compare the counts of the full dataset with the one that I get after running a .distinct()
method:
print('Count of rows: {0}'.format(df.count())) print('Count of distinct rows: {0}'.format(df.distinct().count()))
Here's what you get back for our DataFrame:
If these two numbers differ, then you know you have, what I like to call, pure duplicates: rows that are exact copies of each other. We can drop these rows by using the .dropDuplicates(...)
method:
df = df.dropDuplicates()
Our dataset will then look as follows (once you run df.show()
):
We dropped one of the rows with ID 3
. Now let's check whether there are any duplicates in the data irrespective of ID. We can quickly repeat what we have done earlier, but using only columns other than the ID column:
print('Count of ids: {0}'.format(df.count())) print('Count of distinct ids: {0}'.format( df.select([ c for c in df.columns if c != 'id' ]).distinct().count()) )
We should see one more row that is a duplicate:
We can still use the .dropDuplicates(...)
, but will add the subset
parameter that specifies only the columns other than the id
column:
df = df.dropDuplicates(subset=[ c for c in df.columns if c != 'id' ])
The subset
parameter instructs the .dropDuplicates(...)
method to look for duplicated rows using only the columns specified via the subset
parameter; in the preceding example, we will drop the duplicated records with the same weight
, height
, age
, and gender
but not id
. Running the df.show()
, we get the following cleaner dataset as we dropped the row with id = 1
since it was identical to the record with id = 4
:
Now that we know there are no full rows duplicated, or any identical rows differing only by ID, let's check if there are any duplicated IDs. To calculate the total and distinct number of IDs in one step, we can use the .agg(...)
method:
import pyspark.sql.functions as fn df.agg( fn.count('id').alias('count'), fn.countDistinct('id').alias('distinct') ).show()
Here's the output of the preceding code:
In the previous example, we first import all the functions from the pyspark.sql
module.
This gives us access to a vast array of various functions, too many to list here. However, we strongly encourage you to study the PySpark's documentation at http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#module-pyspark.sql.functions.
Next, we use the .count(...)
and .countDistinct(...)
to, respectively, calculate the number of rows and the number of distinct ids
in our DataFrame. The .alias(...)
method allows us to specify a friendly name to the returned column.
As you can see, we have five rows in total, but only four distinct IDs. Since we have already dropped all the duplicates, we can safely assume that this might just be a fluke in our ID data, so we will give each row a unique ID:
df.withColumn('new_id', fn.monotonically_increasing_id()).show()
The preceding code snippet produced the following output:
The .monotonicallymonotonically_increasing_id()
method gives each record a unique and increasing ID. According to the documentation, as long as your data is put into less than roughly 1 billion partitions with less than 8 billions records in each, the ID is guaranteed to be unique.
You will frequently encounter datasets with blanks in them. The missing values can happen for a variety of reasons: systems failure, people error, data schema changes, just to name a few.
The simplest way to deal with missing values, if your data can afford it, is to drop the whole observation when any missing value is found. You have to be careful not to drop too many: depending on the distribution of the missing values across your dataset it might severely affect the usability of your dataset. If, after dropping the rows, I end up with a very small dataset, or find that the reduction in data size is more than 50%, I start checking my data to see what features have the most holes in them and perhaps exclude those altogether; if a feature has most of its values missing (unless a missing value bears a meaning), from a modeling point of view, it is fairly useless.
The other way to deal with the observations with missing values is to impute some value in place of those Nones
. Given the type of your data, you have several options to choose from:
Missing
Missing
category as wellConsider a similar example to the one we presented previously:
df_miss = spark.createDataFrame([ (1, 143.5, 5.6, 28, 'M', 100000), (2, 167.2, 5.4, 45, 'M', None), (3, None , 5.2, None, None, None), (4, 144.5, 5.9, 33, 'M', None), (5, 133.2, 5.7, 54, 'F', None), (6, 124.1, 5.2, None, 'F', None), (7, 129.2, 5.3, 42, 'M', 76000), ], ['id', 'weight', 'height', 'age', 'gender', 'income'])
In our example, we deal with a number of missing values categories.
Analyzing rows, we see the following:
3
has only one useful piece of information—the height
6
has only one missing value—the age
Analyzing columns, we can see the following:
income
column, since it is a very personal thing to disclose, has most of its values missingweight
and gender
columns have only one missing value eachage
column has two missing valuesTo find the number of missing observations per row, we can use the following snippet:
df_miss.rdd.map( lambda row: (row['id'], sum([c == None for c in row])) ).collect()
It produces the following output:
It tells us that, for example, the row with ID 3
has four missing observations, as we observed earlier.
Let's see what values are missing so that when we count missing observations in columns, we can decide whether to drop the observation altogether or impute some of the observations:
df_miss.where('id == 3').show()
Here's what we get:
Let's now check what percentage of missing observations are there in each column:
df_miss.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns ]).show()
This generates the following output:
So, we have 14% of missing observations in the weight
and gender
columns, twice as much in the height
column, and almost 72% of missing observations in the income
column. Now we know what to do.
First, we will drop the 'income'
feature, as most of its values are missing.
df_miss_no_income = df_miss.select([ c for c in df_miss.columns if c != 'income' ])
We now see that we do not need to drop the row with ID 3
as the coverage in the 'weight'
and 'age'
columns has enough observations (in our simplified example) to calculate the mean and impute it in the place of the missing values.
However, if you decide to drop the observations instead, you can use the .dropna(...)
method, as shown here. Here, we will also use the thresh
parameter, which allows us to specify a threshold on the number of missing observations per row that would qualify the row to be dropped. This is useful if you have a dataset with tens or hundreds of features and you only want to drop those rows that exceed a certain threshold of missing values:
df_miss_no_income.dropna(thresh=3).show()
The preceding code produces the following output:
On the other hand, if you wanted to impute the observations, you can use the .fillna(...)
method. This method accepts a single integer (long is also accepted), float, or string; all missing values in the whole dataset will then be filled in with that value. You can also pass a dictionary of a form {'<colName>': <value_to_impute>}
. This has the same limitation, in that, as the <value_to_impute>
, you can only pass an integer, float, or string.
If you want to impute a mean, median, or other calculated value, you need to first calculate the value, create a dictionary with such values, and then pass it to the .fillna(...)
method.
Here's how we do it:
means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender'] ).toPandas().to_dict('records')[0] means['gender'] = 'missing' df_miss_no_income.fillna(means).show()
The preceding code will produce the following output:
We omit the gender
column as one cannot calculate a mean of a categorical variable, obviously.
We use a double conversion here. Taking the output of the .agg(...)
method (a PySpark DataFrame), we first convert it into a pandas' DataFrame and then once more to a dictionary.
Note that calling the .toPandas()
can be problematic, as the method works essentially in the same way as .collect()
in RDDs. It collects all the information from the workers and brings it over to the driver. It is unlikely to be a problem with the preceding dataset, unless you have thousands upon thousands of features.
The records
parameter to the .to_dict(...)
method of pandas instructs it to create the following dictionary:
Since we cannot calculate the average (or any other numeric metric of a categorical variable), we added the missing
category to the dictionary for the gender
feature. Note that, even though the mean of the age column is 40.40, when imputed, the type of the df_miss_no_income.age
column was preserved—it is still an integer.
Outliers are those observations that deviate significantly from the distribution of the rest of your sample. The definitions of significance vary, but in the most general form, you can accept that there are no outliers if all the values are roughly within the Q1−1.5IQR and Q3+1.5IQR range, where IQR is the interquartile range; the IQR is defined as a difference between the upper- and lower-quartiles, that is, the 75th percentile (the Q3) and 25th percentile (the Q1), respectively.
Let's, again, consider a simple example:
df_outliers = spark.createDataFrame([ (1, 143.5, 5.3, 28), (2, 154.2, 5.5, 45), (3, 342.3, 5.1, 99), (4, 144.5, 5.5, 33), (5, 133.2, 5.4, 54), (6, 124.1, 5.1, 21), (7, 129.2, 5.3, 42), ], ['id', 'weight', 'height', 'age'])
Now we can use the definition we outlined previously to flag the outliers.
First, we calculate the lower and upper cut off points for each feature. We will use the .approxQuantile(...)
method. The first parameter specified is the name of the column, the second parameter can be either a number between 0
or 1
(where 0.5
means to calculated median) or a list (as in our case), and the third parameter specifies the acceptable level of an error for each metric (if set to 0
, it will calculate an exact value for the metric, but it can be really expensive to do so):
cols = ['weight', 'height', 'age'] bounds = {} for col in cols: quantiles = df_outliers.approxQuantile( col, [0.25, 0.75], 0.05 ) IQR = quantiles[1] - quantiles[0] bounds[col] = [ quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR ]
The bounds
dictionary holds the lower and upper bounds for each feature:
Let's now use it to flag our outliers:
outliers = df_outliers.select(*['id'] + [ ( (df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ]) outliers.show()
The preceding code produces the following output:
We have two outliers in the weight
feature and two in the age
feature. By now you should know how to extract these, but here is a snippet that lists the values significantly differing from the rest of the distribution:
df_outliers = df_outliers.join(outliers, on='id') df_outliers.filter('weight_o').select('id', 'weight').show() df_outliers.filter('age_o').select('id', 'age').show()
The preceding code will give you the following output:
Equipped with the methods described in this section, you can quickly clean up even the biggest of datasets.