A recommendation system with MLlib

Spark's MLlib implements a collaborative filtering algorithm called Alternating Least Squares (ALS) to build recommendation systems.

ALS models the rating matrix (R) as the multiplication of a low-rank user (U) and product (V) factors, and learns these factors by minimizing the reconstruction error of the observed ratings. The unknown ratings can subsequently be computed by multiplying these factors. In this way, we can recommend products based on the predicted ratings. Refer to the following quote at https://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html:

"ALS is an iterative algorithm. In each iteration, the algorithm alternatively fixes one factor matrix and solves for the other, and this process continues until it converges. MLlib features a blocked implementation of the ALS algorithm that leverages Spark's efficient support for distributed, iterative computation. It uses native LAPACK to achieve high performance and scales to billions of ratings on commodity clusters."

This is illustrated in Figure 8.1:

A recommendation system with MLlib

Figure 8.1: The ALS algorithm

Let's implement a recommendation system for movies using this ALS algorithm.

Note

All programs in this chapter are executed on CDH 5.8 VM. For other environments, file paths might change, but the concepts are the same in any environment.

Preparing the environment

We need to install NumPy, as it is a requirement for the ALS algorithm. Refer to Chapter 7, Machine Learning with Spark and Hadoop, for installation procedure of NumPy.

Let's download a public movielens data with one million ratings from 6,040 users on 3,706 movies from http://files.grouplens.org/datasets/movielens/ml-1m.zip. Once downloaded, copy the files to HDFS:

wget http://files.grouplens.org/datasets/movielens/ml-1m.zip
unzip ml-1m.zip
cd ml-1m
hadoop fs -put movies.dat movies.dat
hadoop fs -put ratings.dat ratings.dat

The movies.dat file has a format of the movie ID, movie name, and movie genre, as shown in the following snippet:

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy

The ratings.dat file has a format of the user ID, movie ID, rating, and timestamp as shown in the following snippet:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291

Creating RDDs

Now, let's get into the PySpark shell to work interactively with these datasets and build a recommendation system. Alternatively, you can use the IPython or Zeppelin Notebook to execute the following commands. For execution in YARN mode, change the master to yarn-client:

pyspark --master local[*]

Once you are in the shell, import ALS algorithm-related dependencies and create functions to parse the movies and ratings datasets:

>>> from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

>>> def parseMovie(line):
     fields = line.strip().split("::")
     return int(fields[0]), fields[1]

>>> def parseRating(line):
     fields = line.strip().split("::")
     return int(fields[0]), int(fields[1]), float(fields[2])

Create RDDs for movies and ratings by parsing them using the preceding parse functions. Calculate the number of ratings, users, and movies:

>>> moviesRDD = sc.textFile("movies.dat").map(parseMovie)
>>> ratingsRDD = sc.textFile("ratings.dat").map(parseRating)

>>> numRatings = ratingsRDD.count()
>>> numUsers = ratingsRDD.map(lambda r:r[0]).distinct().count()
>>> numMovies = ratingsRDD.map(lambda r: r[1]).distinct().count()

>>> print "Ratings dataset has %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
Ratings dataset has 1000209 ratings from 6040 users on 3706 movies.

Exploring the data with DataFrames

Now, let's create DataFrames for the movies and ratings RDDs to explore the datasets interactively using SQL:

>>> movieSchema = ['movieid', 'name']
>>> ratingsSchema = ['userid', 'movieid', 'rating']
>>> moviesDF  = moviesRDD.toDF(movieSchema)
>>> ratingsDF = ratingsRDD.toDF(ratingsSchema)

>>> moviesDF.printSchema()
root
 |-- movieid: long (nullable = true)
 |-- name: string (nullable = true)

>>> ratingsDF.printSchema()
root
 |-- userid: long (nullable = true)
 |-- movieid: long (nullable = true)
 |-- rating: double (nullable = true)

Let's register the DataFrames as temporary tables:

>>> ratingsDF.createOrReplaceTempView("ratings")
>>> moviesDF.createOrReplaceTempView("movies")

Now, get the maximum and minimum ratings along with the count of users who have rated a movie:

>>> ratingStats = spark.sql(
   """select movies.name, movieratings.maxrtng, movieratings.minrtng, movieratings.cntusr
     from(SELECT ratings.movieid, max(ratings.rating) as maxrtng,
     min(ratings.rating) as minrtng, count(distinct(ratings.userid)) as cntusr 
     FROM ratings group by ratings.movieid ) movieratings
     join movies on movieratings.movieid=movies.movieId
     order by movieratings.cntusr desc""")

>>> ratingStats.show(5)

The output is shown in the following screenshot:

Exploring the data with DataFrames

Figure 8.2: A screenshot of ratingStats

Show the top 10 most active users and how many times they rated a movie:

>>> mostActive = spark.sql(
   """SELECT ratings.userid, count(*) as cnt from ratings
             group by ratings.userid order by cnt desc limit 10""")

>>> mostActive.show(5)

The output of the preceding command is shown in the following screenshot:

Exploring the data with DataFrames

Figure 8.3: A screenshot of the most active users

As per the preceding result, userid 4169 is the most active user. Let's find the movies that user 4169 rated higher than 4:

>>> user4169 = spark.sql("""SELECT ratings.userid, ratings.movieid,
   ratings.rating, movies.name FROM ratings JOIN movies
   ON movies.movieId=ratings.movieid
   where ratings.userid=4169 and ratings.rating > 4""")
>>> user4169.show(5)

The output is shown in the following screenshot:

Exploring the data with DataFrames

Figure 8.4: A screenshot of user 4169

Creating training and testing datasets

Now, let's use the MLlib's ALS algorithm to create a model. First, we need to separate the ratings data into two parts—training (80%) and testing (20%). We need to predict the recommendations for the training data and then compare the predictions with the testing data for cross-validation. This process of cross-validation will reveal the accuracy of the model. Usually, this process is done in multiple iterations with different sets of data to improve the accuracy of the model. In this exercise of building a recommendation engine, we will do only one such iteration.

Use the randomSplit() method to create training data and testing data, and cache them so that the iterative ALS algorithm will perform better:

>>> RDD1, RDD2 = ratingsRDD.randomSplit([0.8, 0.2])
>>> trainingRDD = RDD1.cache()
>>> testRDD = RDD2.cache()
>>> trainingRDD.count()
800597
>>> testRDD.count()
199612

Creating a model

Let's create the model by running the ALS algorithm that takes trainingRDD (user ID, movie ID, and rating) as input with rank (number of latent factors in the model) and number of iterations to run:

>>> rank = 10
>>> numIterations = 10
>>> model = ALS.train(trainingRDD, rank, numIterations)

Let's examine the available methods on this model. You can see methods such as predict, predictAll, recommendProducts, and recommendUsers. We will use the predictAll method. Additionally, you can save the model using the save method and load it for later use:

>>> dir(model)

The output is shown as follows:

Creating a model

Figure 8.5: A screenshot of model methods

Note that the preceding model is created with RDD API. It can be created with DataFrame API as well and can be used with Pipeline API as explained in Chapter 7, Machine Learning with Spark and Hadoop.

Making predictions

Now, get the top five movie predictions for user 4169 from the model generated:

>>> user4169Recs = model.recommendProducts(4169, 5)
>>> user4169Recs
[Rating(user=4169, product=128, rating=5.6649367937005231), Rating(user=4169, product=2562, rating=5.526190642914254), Rating(user=4169, product=2503, rating=5.2328684996745327), Rating(user=4169, product=3245, rating=5.1980663524880235), Rating(user=4169, product=3950, rating=5.0785092078435197)]

Evaluating the model with testing data

Let's evaluate the model by comparing the predictions generated with the real ratings in testRDD. First, remove the ratings from testRDD to create user ID and movie ID pairs only. Once these movie ID and user ID pairs are passed to the model, it will generate the predicted ratings:

>>> testUserMovieRDD = testRDD.map(lambda x: (x[0], x[1]))

>>> testUserMovieRDD.take(2)
[(1, 661), (1, 3408)]

>>> predictionsTestRDD = model.predictAll(testUserMovieRDD).map(lambda r: ((r[0], r[1]), r[2]))

>>> predictionsTestRDD.take(2)
[((4904, 1320), 4.3029711294149289), ((4904, 3700), 4.3938405710892967)]

Let's transform testRDD into the key, value ((user ID, movie ID), rating)) format and then join with the predicted ratings as follows:

>>> ratingsPredictions = testRDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictionsTestRDD)

>>> ratingsPredictions.take(5)
[((5636, 380), (3.0, 2.5810444309550147)), ((5128, 480), (4.0, 3.8897996775001684)), ((5198, 248), (1.0, 1.9741132086395059)), ((2016, 3654), (4.0, 4.2239704909063338)), ((4048, 256), (4.0, 4.1190428484234198))]

The preceding prediction result indicates that, for user ID 5636 and movie ID 380, the actual rating from the testing data was 3.0 and the predicted rating was 2.5. Similarly, for user ID 5128 and movie ID 480, the actual rating was 4.0 and the predicted rating was 3.8.

Checking the accuracy of the model

Now, let's check the model to see how many bad predictions were generated by finding the predicted ratings that were >= 4 when the actual test rating was <= 1:

>>> badPredictions = ratingsPredictions.filter(lambda r: (r[1][0] <= 1 and r[1][1]) >= 4)
>>> badPredictions.take(2)
[((2748, 1080), (1.0, 4.0622434036284556)), ((4565, 175), (1.0, 4.728689683016448))]
>>> badPredictions.count()
395

So, there are 395 bad predictions out of 199612 test ratings. Next, let's evaluate the model using Mean Squared Error (MSE). MSE is the difference between the predicted and the actual target:

>>> MeanSquaredError = ratingsPredictions.map(lambda r: (r[1][0] - r[1][1])**2).mean()
>>> print("Mean Squared Error = " + str(MeanSquaredError))
Mean Squared Error = 0.797355258111

The lower the MSE number, the better the predictions are.

Explicit versus implicit feedback

Input data can be of two types—explicit feedback or implicit feedback from users. In explicit feedback, users rate items as we have seen in the preceding example. The relationship between a set of user-item pairs is directly known in this case.

In many application domains and real-world cases, ratings are not available. We have access to implicit feedback only. We need to consider the presence or absence of events such as movies watched or not.

The Spark MLlib ALS algorithm provides you with a way to deal with implicit feedback. This model tries to relate ratings to the level of confidence from user preferences.

The following method is used to build a model using implicit feedback from users:

model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset