How to do it...

We are going to use the million song data from http://www.kaggle.com/c/msdchallenge/data. You need to download three files:

  • kaggle_visible_evaluation_triplets
  • kaggle_users.txt
  • kaggle_songs.txt

We still need to do some more preprocessing. ALS in MLlib takes both user and product IDs as integers. The Kaggle_songs.txt file has song IDs and a sequence number next to it. The Kaggle_users.txt file does not have a sequence number. Our goal is to replace the userid and songid in the triplets data with the corresponding integer sequence numbers. To do this, follow these steps:

  1. Start Spark shell or Databricks Cloud (preferred):
        $ spark-shell
  1. Do the necessary imports:
        import org.apache.spark.ml.recommendation.ALS 
import org.apache.spark.sql.functions._
import org.apache.spark.ml.evaluation.RegressionEvaluator
  1. Initialize the ALS() function:
        val als = new ALS()
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("songId")
.setRatingCol("plays")
  1. Load the kaggle_songs.txt data as a DataFrame:
        scala> val songs = spark.read.option("delimiter"," ").option("inferschema","true").csv("s3a://sparkcookbook/songdata/kaggle_songs.txt").toDF("song","songId")
  1. Load as temporary view:
        scala> songs.createOrReplaceTempView("songs")
  1. Load the user data as a DataFrame:
        scala> val users = spark.read.textFile("s3a://sparkcookbook/songdata/kaggle_users.txt").toDF("user").withColumn("userId",monotonically_increasing_id)
  1. Cast userId as integer (by default, it gets loaded as long, but maximum value fits well in the integer range, ALS expects UserId and ItemId to be an integer):
        scala> val u =  users.withColumn("userId",users.col("userId").cast("integer"))
  1. Create temporary view:
        scala> u.createOrReplaceTempView("users")
  1. Load the triplets (user, song, plays) data as a dataset:
        scala> val triplets = spark.read.option("delimiter","t").option("inferschema","true").csv("s3a://sparkcookbook/songdata/kaggle_visible_evaluation_triplets.txt").toDF("user","song","plays")
  1. Cast plays to double:
        scala> val t = triplets.withColumn("plays",triplets.col("plays").cast("double"))
  1. Load as temporary view:
        scala> t.createOrReplaceTempView("plays")
  1. Create DataFrame with userId, songId, plays tuple:
        scala> val plays = spark.sql("select userId,songId,plays from plays p join users u on p.user = u.user join songs s on p.song = s.song")
  1. Create training, test split:
        scala> val Array(training, test) = plays.randomSplit(Array(0.7, 0.3))
  1. Train the model:
        scala> val model = als.fit(training)
  1. Do predictions on the test dataset:
        scala> val predictions = model.transform(test).na.drop
  1. Load evaluator:
        scala> val evaluator = new RegressionEvaluator()
.setMetricName("mae")
.setLabelCol("plays")
.setPredictionCol("prediction")
  1. Evaluate results:
        scala> val rmse = evaluator.evaluate(predictions)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset