In a previous section, we have seen how the K-means work. So we can directly dive into the implementation. Since the training will be unsupervised, we need to drop the label column (that is, Region):
val sqlContext = sparkSession.sqlContext
val schemaDF = sqlContext.createDataFrame(rowRDD, header).drop("Region")
schemaDF.printSchema()
schemaDF.show(10)
>>>
Now, we have seen in Chapters 1, Analyzing Insurance Severity Claims and Chapter 2, Analyzing and Predicting Telecommunication Churn that Spark expects two columns (that is, features and label) for supervised training, and for unsupervised training, it expects only a single column containing the features. Since we dropped the label column, we now need to amalgamate the entire variable column into a single features column. So for this, we will again use the VectorAssembler() transformer. At first, let's select the columns to be embedded into a vector space:
val featureCols = schemaDF.columns
Then we instantiate the VectorAssembler() transformer, specifying the input columns and the output column:
val assembler =
new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
val assembleDF = assembler.transform(schemaDF).select("features")
Now let's see how it looks:
assembleDF.show()
>>>
Since our dataset is very highly dimensional, we can use some dimensionality algorithms such as PCA. So let's do it by instantiating a PCA() transformer as follows:
val pca =
new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(50)
.fit(assembleDF)
Then we transform the assembled DataFrame (that is, assembled) and the top 50 principle components. You can adjust the number though. Finally, to avoid the ambiguity, we renamed the pcaFeatures column to features:
val pcaDF = pca.transform(assembleDF)
.select("pcaFeatures")
.withColumnRenamed("pcaFeatures", "features")
pcaDF.show()
>>>
Excellent! Everything went smoothly. Finally, we are ready to train the K-means algorithm:
val kmeans =
new KMeans().setK(5).setSeed(12345L)
val model = kmeans.fit(pcaDF)
So let's evaluate clustering by computing the Within-Set Sum of Squared Errors (WSSSE):
val WSSSE = model.computeCost(pcaDF)
println("Within-Cluster Sum of Squares for k = 5 is" + WSSSE)
>>>