Spark's MLlib implements a collaborative filtering algorithm called Alternating Least Squares (ALS) to build recommendation systems.
ALS models the rating matrix (R) as the multiplication of a low-rank user (U) and product (V) factors, and learns these factors by minimizing the reconstruction error of the observed ratings. The unknown ratings can subsequently be computed by multiplying these factors. In this way, we can recommend products based on the predicted ratings. Refer to the following quote at https://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html:
"ALS is an iterative algorithm. In each iteration, the algorithm alternatively fixes one factor matrix and solves for the other, and this process continues until it converges. MLlib features a blocked implementation of the ALS algorithm that leverages Spark's efficient support for distributed, iterative computation. It uses native LAPACK to achieve high performance and scales to billions of ratings on commodity clusters."
This is illustrated in Figure 8.1:
Let's implement a recommendation system for movies using this ALS algorithm.
We need to install NumPy, as it is a requirement for the ALS algorithm. Refer to Chapter 7, Machine Learning with Spark and Hadoop, for installation procedure of NumPy.
Let's download a public movielens data with one million ratings from 6,040 users on 3,706 movies from http://files.grouplens.org/datasets/movielens/ml-1m.zip. Once downloaded, copy the files to HDFS:
wget http://files.grouplens.org/datasets/movielens/ml-1m.zip unzip ml-1m.zip cd ml-1m hadoop fs -put movies.dat movies.dat hadoop fs -put ratings.dat ratings.dat
The movies.dat
file has a format of the movie ID, movie name, and movie genre, as shown in the following snippet:
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy
The ratings.dat
file has a format of the user ID, movie ID, rating, and timestamp as shown in the following snippet:
1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291
Now, let's get into the PySpark shell to work interactively with these datasets and build a recommendation system. Alternatively, you can use the IPython or Zeppelin Notebook to execute the following commands. For execution in YARN mode, change the master to yarn-client:
pyspark --master local[*]
Once you are in the shell, import ALS algorithm-related dependencies and create functions to parse the movies and ratings datasets:
>>> from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating >>> def parseMovie(line): fields = line.strip().split("::") return int(fields[0]), fields[1] >>> def parseRating(line): fields = line.strip().split("::") return int(fields[0]), int(fields[1]), float(fields[2])
Create RDDs for movies and ratings by parsing them using the preceding parse functions. Calculate the number of ratings, users, and movies:
>>> moviesRDD = sc.textFile("movies.dat").map(parseMovie) >>> ratingsRDD = sc.textFile("ratings.dat").map(parseRating) >>> numRatings = ratingsRDD.count() >>> numUsers = ratingsRDD.map(lambda r:r[0]).distinct().count() >>> numMovies = ratingsRDD.map(lambda r: r[1]).distinct().count() >>> print "Ratings dataset has %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies) Ratings dataset has 1000209 ratings from 6040 users on 3706 movies.
Now, let's create DataFrames for the movies and ratings RDDs to explore the datasets interactively using SQL:
>>> movieSchema = ['movieid', 'name'] >>> ratingsSchema = ['userid', 'movieid', 'rating'] >>> moviesDF = moviesRDD.toDF(movieSchema) >>> ratingsDF = ratingsRDD.toDF(ratingsSchema) >>> moviesDF.printSchema() root |-- movieid: long (nullable = true) |-- name: string (nullable = true) >>> ratingsDF.printSchema() root |-- userid: long (nullable = true) |-- movieid: long (nullable = true) |-- rating: double (nullable = true)
Let's register the DataFrames as temporary tables:
>>> ratingsDF.createOrReplaceTempView("ratings") >>> moviesDF.createOrReplaceTempView("movies")
Now, get the maximum and minimum ratings along with the count of users who have rated a movie:
>>> ratingStats = spark.sql( """select movies.name, movieratings.maxrtng, movieratings.minrtng, movieratings.cntusr from(SELECT ratings.movieid, max(ratings.rating) as maxrtng, min(ratings.rating) as minrtng, count(distinct(ratings.userid)) as cntusr FROM ratings group by ratings.movieid ) movieratings join movies on movieratings.movieid=movies.movieId order by movieratings.cntusr desc""") >>> ratingStats.show(5)
The output is shown in the following screenshot:
Show the top 10 most active users and how many times they rated a movie:
>>> mostActive = spark.sql( """SELECT ratings.userid, count(*) as cnt from ratings group by ratings.userid order by cnt desc limit 10""") >>> mostActive.show(5)
The output of the preceding command is shown in the following screenshot:
As per the preceding result, userid
4169
is the most active user. Let's find the movies that user 4169
rated higher than 4
:
>>> user4169 = spark.sql("""SELECT ratings.userid, ratings.movieid, ratings.rating, movies.name FROM ratings JOIN movies ON movies.movieId=ratings.movieid where ratings.userid=4169 and ratings.rating > 4""") >>> user4169.show(5)
The output is shown in the following screenshot:
Now, let's use the MLlib's ALS algorithm to create a model. First, we need to separate the ratings data into two parts—training (80%) and testing (20%). We need to predict the recommendations for the training data and then compare the predictions with the testing data for cross-validation. This process of cross-validation will reveal the accuracy of the model. Usually, this process is done in multiple iterations with different sets of data to improve the accuracy of the model. In this exercise of building a recommendation engine, we will do only one such iteration.
Use the randomSplit()
method to create training data and testing data, and cache them so that the iterative ALS algorithm will perform better:
>>> RDD1, RDD2 = ratingsRDD.randomSplit([0.8, 0.2]) >>> trainingRDD = RDD1.cache() >>> testRDD = RDD2.cache() >>> trainingRDD.count() 800597 >>> testRDD.count() 199612
Let's create the model by running the ALS algorithm that takes trainingRDD
(user ID, movie ID, and rating) as input with rank
(number of latent factors in the model) and number of iterations to run:
>>> rank = 10 >>> numIterations = 10 >>> model = ALS.train(trainingRDD, rank, numIterations)
Let's examine the available methods on this model. You can see methods such as predict
, predictAll
, recommendProducts
, and recommendUsers
. We will use the predictAll
method. Additionally, you can save the model using the save
method and load it for later use:
>>> dir(model)
The output is shown as follows:
Note that the preceding model is created with RDD API. It can be created with DataFrame API as well and can be used with Pipeline API as explained in Chapter 7, Machine Learning with Spark and Hadoop.
Now, get the top five movie predictions for user 4169
from the model generated:
>>> user4169Recs = model.recommendProducts(4169, 5) >>> user4169Recs [Rating(user=4169, product=128, rating=5.6649367937005231), Rating(user=4169, product=2562, rating=5.526190642914254), Rating(user=4169, product=2503, rating=5.2328684996745327), Rating(user=4169, product=3245, rating=5.1980663524880235), Rating(user=4169, product=3950, rating=5.0785092078435197)]
Let's evaluate the model by comparing the predictions generated with the real ratings in testRDD
. First, remove the ratings from testRDD
to create user ID and movie ID pairs only. Once these movie ID and user ID pairs are passed to the model, it will generate the predicted ratings:
>>> testUserMovieRDD = testRDD.map(lambda x: (x[0], x[1])) >>> testUserMovieRDD.take(2) [(1, 661), (1, 3408)] >>> predictionsTestRDD = model.predictAll(testUserMovieRDD).map(lambda r: ((r[0], r[1]), r[2])) >>> predictionsTestRDD.take(2) [((4904, 1320), 4.3029711294149289), ((4904, 3700), 4.3938405710892967)]
Let's transform testRDD
into the key, value ((user ID, movie ID), rating)) format and then join with the predicted ratings as follows:
>>> ratingsPredictions = testRDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictionsTestRDD) >>> ratingsPredictions.take(5) [((5636, 380), (3.0, 2.5810444309550147)), ((5128, 480), (4.0, 3.8897996775001684)), ((5198, 248), (1.0, 1.9741132086395059)), ((2016, 3654), (4.0, 4.2239704909063338)), ((4048, 256), (4.0, 4.1190428484234198))]
The preceding prediction result indicates that, for user ID 5636
and movie ID 380
, the actual rating from the testing data was 3.0
and the predicted rating was 2.5
. Similarly, for user ID 5128
and movie ID 480
, the actual rating was 4.0
and the predicted rating was 3.8
.
Now, let's check the model to see how many bad predictions were generated by finding the predicted ratings that were >= 4
when the actual test rating was <= 1
:
>>> badPredictions = ratingsPredictions.filter(lambda r: (r[1][0] <= 1 and r[1][1]) >= 4) >>> badPredictions.take(2) [((2748, 1080), (1.0, 4.0622434036284556)), ((4565, 175), (1.0, 4.728689683016448))] >>> badPredictions.count() 395
So, there are 395
bad predictions out of 199612
test ratings. Next, let's evaluate the model using Mean Squared Error (MSE). MSE is the difference between the predicted and the actual target:
>>> MeanSquaredError = ratingsPredictions.map(lambda r: (r[1][0] - r[1][1])**2).mean() >>> print("Mean Squared Error = " + str(MeanSquaredError)) Mean Squared Error = 0.797355258111
The lower the MSE number, the better the predictions are.
Input data can be of two types—explicit feedback or implicit feedback from users. In explicit feedback, users rate items as we have seen in the preceding example. The relationship between a set of user-item pairs is directly known in this case.
In many application domains and real-world cases, ratings are not available. We have access to implicit feedback only. We need to consider the presence or absence of events such as movies watched or not.
The Spark MLlib ALS algorithm provides you with a way to deal with implicit feedback. This model tries to relate ratings to the level of confidence from user preferences.
The following method is used to build a model using implicit feedback from users:
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)