Exploratory analysis and feature engineering

In this sub-section, we will see some EDA of the dataset before we start preprocessing and feature engineering. Only then creation of an analytics pipeline makes sense. At first, let's import necessary packages and libraries as follows:

import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.Dataset

Then, let's specify the data source and schema for the dataset to be processed. When loading the data into a DataFrame, we can specify the schema. This specification provides optimized performance compared to the pre-Spark 2.x schema inference.

At first, let's create a Scala case class with all the fields specified. The variable names are self-explanatory:

case class CustomerAccount(state_code: String, 
account_length: Integer,
area_code: String,
international_plan: String,
voice_mail_plan: String,
num_voice_mail: Double,
total_day_mins: Double,
total_day_calls: Double,
total_day_charge: Double,
total_evening_mins: Double,
total_evening_calls: Double,
total_evening_charge: Double,
total_night_mins: Double,
total_night_calls: Double,
total_night_charge: Double,
total_international_mins: Double,
total_international_calls: Double,
total_international_charge: Double,
total_international_num_calls: Double,
churn: String)

Now, let's create a custom schema having a structure similar to our already created data source, as follows:

val schema = StructType(Array(
StructField("state_code", StringType, true),
StructField("account_length", IntegerType, true),
StructField("area_code", StringType, true),
StructField("international_plan", StringType, true),
StructField("voice_mail_plan", StringType, true),
StructField("num_voice_mail", DoubleType, true),
StructField("total_day_mins", DoubleType, true),
StructField("total_day_calls", DoubleType, true),
StructField("total_day_charge", DoubleType, true),
StructField("total_evening_mins", DoubleType, true),
StructField("total_evening_calls", DoubleType, true),
StructField("total_evening_charge", DoubleType, true),
StructField("total_night_mins", DoubleType, true),
StructField("total_night_calls", DoubleType, true),
StructField("total_night_charge", DoubleType, true),
StructField("total_international_mins", DoubleType, true),
StructField("total_international_calls", DoubleType, true),
StructField("total_international_charge", DoubleType, true),
StructField("total_international_num_calls", DoubleType, true),
StructField("churn", StringType, true)
))

Let's create a Spark session and import the implicit._ that enables us to specify a DataFrame operation, as follows:

val spark: SparkSession = SparkSessionCreate.createSession("preprocessing")
import spark.implicits._

Now let's create the training set. We read the CSV file with Spark's recommended format, com.databricks.spark.csv. We don't need any explicit schema inference, making the infer Schema false, but instead, we need our own schema we just created previously. Then, we load the data file from our desired location, and finally, specify our data source so that our DataFrame looks exactly the same as we specified:

val trainSet: Dataset[CustomerAccount] = spark.read.
option("inferSchema", "false")
.format("com.databricks.spark.csv")
.schema(schema)
.load("data/churn-bigml-80.csv")
.as[CustomerAccount]

Now, let's see what the schema looks like:

trainSet.printSchema()
>>>

Excellent! It looks exactly the same as the data structure. Now let's see some sample data using the show() method, as follows:

trainSet.show()
>>>

In the following figure, column names are made shorter for visibility on the picture:

We can also see related statistics of the training set using the describe() method from Spark:

The describe() method is a Spark DataFrame's built-in method for statistical processing. It applies summary statistics calculations on all numeric columns. Finally, it returns the computed values as a single DataFrame.
val statsDF = trainSet.describe()
statsDF.show()
>>>

If this dataset can be fit into RAM, we can cache it for quick and repeated access using the cache() method from Spark:

trainSet.cache()

Let's see some useful properties, such as variable correlation with churn. For example, let's see how the churn is related to the total number of international calls:

trainSet.groupBy("churn").sum("total_international_num_calls").show()
>>>
+-----+----------------------------------+
churn|sum(total_international_num_calls)|
+-----+----------------------------------+
|False| 3310.0|
| True| 856.0|
+-----+----------------------------------+

Let's see how the churn is related to the total international call charges:

trainSet.groupBy("churn").sum("total_international_charge").show()
>>>
+-----+-------------------------------+
|churn|sum(total_international_charge)|
+-----+-------------------------------+
|False| 6236.499999999996|
| True| 1133.63|
+-----+-------------------------------+

Now that we also need to have the test set prepared to evaluate the model, let's prepare the same set, similar to the train set, as follows:

val testSet: Dataset[CustomerAccount] = 
spark.read.
option("inferSchema", "false")
.format("com.databricks.spark.csv")
.schema(schema)
.load("data/churn-bigml-20.csv")
.as[CustomerAccount]

Now let's cache them for faster access for further manipulation:

testSet.cache()

Now, let's see some related properties of the training set to understand its suitableness for our purposes. At first, let's create a temp view for persistence for this session. We can create a catalog as an interface that can be used to create, drop, alter, or query underlying databases, tables, functions, and many more:

trainSet.createOrReplaceTempView("UserAccount")
spark.catalog.cacheTable("UserAccount")

Grouping the data by the churn label and calculating the number of instances in each group demonstrates that there are around six times more false churn samples as true churn samples. Let's verify this statement with the following line of code:

trainSet.groupBy("churn").count.show()
>>>
+-----+-----+
|churn|count|
+-----+-----+
|False| 2278|
| True| 388 |
+-----+-----+

We can also see the previous statement, verified using Apache Zeppelin (see more details on how to configure and getting started in Chapter 8, Using Deep Belief Networks in Bank Marketing), as follows:

spark.sqlContext.sql("SELECT churn,SUM(international_num_calls) as Total_intl_call FROM UserAccount GROUP BY churn").show()
>>>

As we have already stated, in most cases the target is to retain the customers who are most likely to leave, as opposed to those who are likely to stay or are staying. This also signifies that we should prepare our training set such that it ensures that our ML model is sensitive to the true churn samples—that is, having churn label true.

We can also observe that the preceding training set is highly unbalanced. Therefore, it would be feasible to put two sample types on the same footing using stratified sampling. The sampleBy() method can be used to do so when provided with fractions of each sample type to be returned.

Here, we're keeping all instances of the True churn class, but downsampling the False churn class to a fraction of 388/2278, which is about 0.1675:

val fractions = Map("False" -> 0.1675, "True" -> 1.0)

This way, we are also mapping only True churn samples. Now, let's create a new DataFrame for the training set containing only downsampled ones:

val churnDF = trainSet.stat.sampleBy("churn", fractions, 12345L)

The third parameter is the seed used for the reproducibility purpose. Now let's see:

churnDF.groupBy("churn").count.show()
>>>
+-----+-----+
|churn|count|
+-----+-----+
|False| 390|
| True| 388|
+-----+-----+

Now let's see how the variables are related to each other. Let's see how the day, night, evening, and international voice calls contribute to the churn class. Just execute the following line:

spark.sqlContext.sql("SELECT churn, SUM(total_day_charge) as TDC, SUM(total_evening_charge) as TEC,    
SUM(total_night_charge) as TNC, SUM(total_international_charge) as TIC,
SUM(total_day_charge) + SUM(total_evening_charge) + SUM(total_night_charge) +
SUM(total_international_charge) as Total_charge FROM UserAccount GROUP BY churn
ORDER BY Total_charge DESC")
.show()
>>>

On Apache Zeppelin, the preceding result can be seen as follows:

Now, let's see how many minutes of day, night, evening, and international voice calls have contributed to the preceding total charge to the churn class. Just execute the following line:

spark.sqlContext.sql("SELECT churn, SUM(total_day_mins) 
+ SUM(total_evening_mins) + SUM(total_night_mins)
+ SUM(total_international_mins) as Total_minutes
FROM UserAccount GROUP BY churn").show()
>>>

On Apache Zeppelin, the preceding result can be seen as follows:

From the preceding two graphs and tables, it is clear that total day minutes and total day charge are a highly correlated feature in this training set, which is not beneficial for our ML model training. Therefore, it would be better to remove them altogether. Moreover, the following graph shows all possible correlations (plotted in PySpark, though):

Figure 5: Correlation matrix, including all the features

Let's drop one column of each pair of correlated fields, along with the State and Area code columns, too, since those will not be used either:

val trainDF = churnDF
.drop("state_code")
.drop("area_code")
.drop("voice_mail_plan")
.drop("total_day_charge")
.drop("total_evening_charge")

Excellent. Finally, we have our training DataFrame that can be used for better predictive modeling. Let's take a look at some columns of the resulting DataFrame:

trainDF.select("account_length", "international_plan", "num_voice_mail",         
"total_day_calls","total_international_num_calls", "churn")
.show(10)
>>>

However, we are not done yet; the current DataFrame cannot be fed to the model as an estimator. As we described, the Spark ML API needs our data to be converted in a Spark DataFrame format, consisting of a label (in Double) and features (in Vector).

Now, we need to create a pipeline to pass the data through and chain several transformers and estimators. The pipeline then works as a feature extractor. More specifically, we have prepared two StringIndexer transformers and a VectorAssembler.

StringIndexer encodes a categorical column of labels to a column of label indices (that is, numerical). If the input column is numeric, we have to cast it into a string and index the string values. Other Spark pipeline components, such as Estimator or Transformer, make use of this string-indexed label. In order to do this, the input column of the component must be set to this string-indexed column name. In many cases, you can set the input column with setInputCol. Interested readers should refer to this https://spark.apache.org/docs/latest/ml-features.html for more details.

The first StringIndexer converts the String categorical feature international_plan and labels into number indices. The second StringIndexer converts the categorical label (that is, churn) to numeric. This way, indexing categorical features enables decision trees and random forest-like classifiers to treat categorical features appropriately, hence improving performance.

Now, add the following lines of code, index labels, and metadata to the label column. Fit on the whole dataset to include all labels in the index:

val ipindexer = new StringIndexer()
.setInputCol("international_plan")
.setOutputCol("iplanIndex")

val labelindexer = new StringIndexer()
.setInputCol("churn")
.setOutputCol("label")

Now we need to extract the most important features that contribute to the classification. Since we have dropped some columns already, the resulting column set consists of the following fields:

* Label → churn: True or False
* Features → {("account_length", "iplanIndex", "num_voice_mail", "total_day_mins", "total_day_calls", "total_evening_mins", "total_evening_calls", "total_night_mins", "total_night_calls", "total_international_mins", "total_international_calls", "total_international_num_calls"}

As we have already converted categorical labels into numeric using StringIndexer, the next task is to extract the features:

val featureCols = Array("account_length", "iplanIndex", 
"num_voice_mail", "total_day_mins",
"total_day_calls", "total_evening_mins",
"total_evening_calls", "total_night_mins",
"total_night_calls", "total_international_mins",
"total_international_calls", "total_international_num_calls")

Now, let's transform the features into feature vectors, which are vectors of numbers representing the value for each feature. In our case, we will use VectorAssembler. It takes all the featureCols and combines/transforms them into a single column called features:

val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")

Now that we have the real training set consisting of labels and feature vectors ready, the next task is to create an estimator—the third element of a pipeline. We start with a very simple but powerful Logistic Regression classifier.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset