We are going to use the million song data from http://www.kaggle.com/c/msdchallenge/data. You need to download three files:
- kaggle_visible_evaluation_triplets
- kaggle_users.txt
- kaggle_songs.txt
We still need to do some more preprocessing. ALS in MLlib takes both user and product IDs as integers. The Kaggle_songs.txt file has song IDs and a sequence number next to it. The Kaggle_users.txt file does not have a sequence number. Our goal is to replace the userid and songid in the triplets data with the corresponding integer sequence numbers. To do this, follow these steps:
- Start Spark shell or Databricks Cloud (preferred):
$ spark-shell
- Do the necessary imports:
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions._
import org.apache.spark.ml.evaluation.RegressionEvaluator
- Initialize the ALS() function:
val als = new ALS()
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("songId")
.setRatingCol("plays")
- Load the kaggle_songs.txt data as a DataFrame:
scala> val songs = spark.read.option("delimiter"," ").option("inferschema","true").csv("s3a://sparkcookbook/songdata/kaggle_songs.txt").toDF("song","songId")
- Load as temporary view:
scala> songs.createOrReplaceTempView("songs")
- Load the user data as a DataFrame:
scala> val users = spark.read.textFile("s3a://sparkcookbook/songdata/kaggle_users.txt").toDF("user").withColumn("userId",monotonically_increasing_id)
- Cast userId as integer (by default, it gets loaded as long, but maximum value fits well in the integer range, ALS expects UserId and ItemId to be an integer):
scala> val u = users.withColumn("userId",users.col("userId").cast("integer"))
- Create temporary view:
scala> u.createOrReplaceTempView("users")
- Load the triplets (user, song, plays) data as a dataset:
scala> val triplets = spark.read.option("delimiter","t").option("inferschema","true").csv("s3a://sparkcookbook/songdata/kaggle_visible_evaluation_triplets.txt").toDF("user","song","plays")
- Cast plays to double:
scala> val t = triplets.withColumn("plays",triplets.col("plays").cast("double"))
- Load as temporary view:
scala> t.createOrReplaceTempView("plays")
- Create DataFrame with userId, songId, plays tuple:
scala> val plays = spark.sql("select userId,songId,plays from plays p join users u on p.user = u.user join songs s on p.song = s.song")
- Create training, test split:
scala> val Array(training, test) = plays.randomSplit(Array(0.7, 0.3))
- Train the model:
scala> val model = als.fit(training)
- Do predictions on the test dataset:
scala> val predictions = model.transform(test).na.drop
- Load evaluator:
scala> val evaluator = new RegressionEvaluator()
.setMetricName("mae")
.setLabelCol("plays")
.setPredictionCol("prediction")
- Evaluate results:
scala> val rmse = evaluator.evaluate(predictions)