Clustering using K-means

Clustering is a class of unsupervised learning algorithms wherein the dataset is partitioned into a finite number of clusters in such a way that the points within a cluster are similar to each other in some way. This, intuitively, also means that the points of two different clusters should be dissimilar.

K-means is one of the popular clustering algorithms, and in this recipe, we'll be looking at how Spark implements K-means and how to use the algorithm to cluster a sample dataset. Since the number of clusters is a crucial input for the K-means algorithm, we'll also see the most common method of arriving at the optimal number of clusters for the data.

How to do it...

Spark provides two initialization modes for cluster center (centroid) initialization: the original Lloyd's method (https://en.wikipedia.org/wiki/K-means_clustering), and a parallelizable and scalable variant of K-means++ (https://en.wikipedia.org/wiki/K-means%2B%2B). K-means++ itself is a variant of the original K-means and differs in the way in which the initial centroids of the clusters are picked up. We can switch between the original and the parallelized K-means++ versions by passing KMeans.RANDOM or KMeans.PARALLEL as the initialization mode. Let's first look at the details of the implementation.

KMeans.RANDOM

In the regular K-means (the KMeans.RANDOM initialization mode in the case of Spark), the algorithm randomly selects k points (equal to the number of clusters that we expect to see) and marks them as cluster centers (centroids). Then it iteratively does the following:

  • It marks all the points as belonging to a cluster based on the distance between a point and its nearest centroid.
  • The mean of all the points in a cluster is calculated. This mean is now set as the new centroid of that cluster.
  • The rest of the data points are reassigned their clusters based on this new centroid.

Since we generally deal with more than one feature in a dataset, each instance of the data and the centroids are vectors. In Spark, we represent them as org.apache.spark.mllib.linalg.Vector.

KMeans.PARALLEL

Scalable K-means or K-means|| is a variant of K-means++. Let's look at what these variants of K-means actually do.

K-means++

Instead of choosing all the centroids randomly, the K-means++ algorithm does the following:

  1. It chooses the first centroid randomly (uniform)
  2. It calculates the distance squared of each of the rest of the points from the current centroid
  3. A probability is attached to each of these points based on how far they are. The farther the centroid candidate is, the higher is its probability.
  4. We choose the second centroid from the distribution that we have in step 3.
  5. On the ith iteration, we have 1+i clusters. Find the new centroid by going over the entire dataset and forming a distribution out of these points based on how far they are from all the precomputed centroids.

These steps are repeated over k-1 iterations until k centroids are selected. K-means++ is known for considerably increasing the quality of centroids. However, as we see, in order to select the initial set of centroids, the algorithm goes through the entire dataset k times. Unfortunately, with a large dataset, this becomes a problem.

K-means||

With K-means parallel (K-means||), for each iteration, instead of choosing a single point after calculating the probability distribution of each of the points in the dataset, a lot more points are chosen. In the case of Spark, the number of samples that are chosen per step is 2 * k. Once these initial centroid candidates are selected, a K-means++ is run against these data points (instead of going through the entire dataset).

Let's now look at the most important parameters that are passed to the algorithm.

Max iterations

There are worst-case scenarios for both random and parallel. In the case of random, since the points in K-means are chosen at random, there is a distinct possibility that the model identifies two centroids from the same cluster. Say with k=3, there is a possibility of two clusters becoming a part of a single cluster and a single cluster being separated into two. A similar case applies to K-means++ with a bad choice of the initial set of centroids.

The following figure proves that though we can see three clusters, a bad choice of centroids separates a single cluster into two and makes two clusters one:

Max iterations

To solve this problem, we run the same algorithm with a different set of randomly initialized centroids. This is determined by the maxIterations parameter. The distance between the centroid and the points in the cluster is calculated (a mean squared difference in distances). This will be the cost of the model. The iteration with the least cost is chosen and returned. The metric that Spark uses to calculate the distance is the Euclidean distance.

Epsilon

How does the K-means algorithm know when to stop? There will always be a small distance that the centroid can move if the clusters aren't separated by a huge margin. If all the centroids have moved by a distance less than the epsilon parameter, it's the cue to the algorithm that it has converged. In other words, the epsilon is nothing but a convergence threshold.

Now that we have the parameters that need to be passed to the K-means cluster out of the way, let's look at the steps needed to run this algorithm to find the clusters:

  1. Importing the data and converting it into a vector.
  2. Feature scaling the data.
  3. Deriving the number of clusters.
  4. Constructing the model.
  5. Evaluating the model.

Importing the data and converting it into a vector

As usual, our input data is in the form of a text file—iris.data. Since we are clustering, we can ignore the label (species) in the data. The data file looks like this:

Importing the data and converting it into a vector
 val data = sc.textFile("iris.data").map(line => {
    val dataArray = line.split(",").take(4)
    Vectors.dense(dataArray.map(_.toDouble))
  })

Feature scaling the data

When we look at the summary statistics of the data, the data looks alright, but it is always advisable perform do feature scaling before running a K-means:

val stats = Statistics.colStats(data)
println("Statistics before scaling")
print(s"Max : ${stats.max}, Min : ${stats.min}, and Mean : ${stats.mean} and Variance : ${stats.variance}")

Here is the statistics before scaling:

Max : [7.9,4.4,6.9,2.5]
Min : [4.3,2.0,1.0,0.1]
Mean : [5.843333333333332,3.0540000000000003,3.7586666666666666,1.1986666666666668]
Variance : [0.685693512304251,0.18800402684563744,3.113179418344516,0.5824143176733783]

We run the data using StandardScaler and cache the resulting RDD. Since K-means goes through the dataset multiple times, caching the data is strongly recommended to avoid recomputation:

//Scale data
  val scaler = new StandardScaler(withMean = true, withStd = true).fit(data)
  val scaledData = scaler.transform(data).cache()

The following is the statistics after scaling:

Max : [2.483698580557868,3.1042842692548858,1.7803768862629268,1.7051890410833728]

Min : [-1.8637802962695154,-2.4308436996988485,-1.5634973589465175,-1.4396268133736672], and Mean : [1.6653345369377348E-15,-7.216449660063518E-16,-1.1102230246251565E-16,-3.3306690738754696E-16]

Variance : [0.9999999999999997,1.0000000000000007,1.0000000000000013,0.9999999999999997]

Deriving the number of clusters

Many times, we already know the number of clusters that are there in the dataset. But at times, if we aren't sure, the general method is to plot the number of clusters against the cost and watch out for the point from which the cost stops falling drastically. If the data is large, running the entire set of data just to obtain the number of clusters is computationally expensive. Instead, we can take a random sample and come up with the k value. In this example, we have taken a random 20% sample, but the sample percentage depends entirely on your dataset:

 //Take a sample to come up with the number of clusters
val sampleData = scaledData.sample(false, 0.2).cache()
  //Decide number of clusters
  val clusterCost = (1 to 7).map { noOfClusters =>

    val kmeans = new KMeans()
      .setK(noOfClusters)
      .setMaxIterations(5)
      .setInitializationMode(KMeans.K_MEANS_PARALLEL) //KMeans||

    val model = kmeans.run(sampleData)

    (noOfClusters, model.computeCost(sampleData))

  }

  println ("Cluster cost on sample data")
  clusterCost.foreach(println)
Deriving the number of clusters

When we plot this, we can see that after cluster 3, the cost does not reduce drastically. This point is called an Elbow bend, as shown here:

Deriving the number of clusters

Constructing the model

Now that we have figured out the number of clusters, let's run the algorithm against the entire dataset:

 //Let's do the real run for 3 clusters
  val kmeans = new KMeans()
    .setK(3)
    .setMaxIterations(5)
    .setInitializationMode(KMeans.K_MEANS_PARALLEL) //KMeans||

  val model = kmeans.run(scaledData)

Evaluating the model

The last step is to evaluate the model by printing the cost of this model. The cost is nothing but the square of the distance between all points in a cluster to its centroid. Therefore, a good model must have the least cost:

//Cost
  println("Total cost " + model.computeCost(scaledData))
  printClusterCenters(model)

  def printClusterCenters(model:KMeansModel) {
    //Cluster centers
    val clusterCenters: Array[Vector] = model.clusterCenters
    println("Cluster centers")
    clusterCenters.foreach(println)

 }

Here is the output:

Total cost 34.98320617204239
Cluster centers
[-0.011357501034038157,-0.8699705596441868,0.3756258413625911,0.3106129627676019]
[1.1635361185919766,0.1532643388373168,0.999796072473665,1.0261947088710572]
[-1.0111913832028123,0.839494408624649,-1.3005214861029282,-1.250937862106244]
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset