Movie Recommendations using Matrix Factorization

  • This tutorial will give quick introduction to recommendations using matrix factorization method. We will be using movielens data for making recommendations. The dataset is available here.
  • There is a tutorial on recommendations systems and collaborative filtering here, which introduces the dataset and how collborative filtering techniques like user similarity and item similarity can be used to make recommendations.

Matrix Factorization

  • Matrix Factorization is also know as latent factors. This technique will decompose the original matrix (which represents the ratings given by users for different movies) into two matrixes as follows.

  • Where the left matrix represents the users as rows and the features as columns. And the cell values represent the user's like-ness for these features. There are no explicit intrepretation of these feautures and hence these are called latent factors. And the right matrix represents the movies and their weightage on these features.
  • So, when there are features available in a movie that a particular user like, it is expected that the user would like that movie.

Mathematically

if P is user matrix with user as rows and features as columns if Q is item matrix with item as rows and features as columns

Then the user-item rating matrix can be written as

${R} \approx \mathbf{P} \times \mathbf{Q}^T = \hat{\mathbf{R}}$

where every element of the matrix can be computed as,

${r}_{ij} = p_i^T q_j = \sum_{k=1}^k{p_{ik}q_{kj}}$

Using Spark MLIB for making recommendations

Verify Spark Context

In [19]:
sc
Out[19]:
<pyspark.context.SparkContext at 0x102dbf908>
In [20]:
sc.master
Out[20]:
'local[*]'
In [21]:
sc.version
Out[21]:
'2.0.1'

Read ratings data into dataframe

Define dataframe schema

In [22]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
[StructField('userId', IntegerType()),
 StructField('movieId', IntegerType()),
 StructField('rating', DoubleType())]
)
movies_df_schema = StructType(
[StructField('ID', IntegerType()),
 StructField('title', StringType())]
)

Load the dataset using the schema above

In [23]:
raw_ratings_df = sqlContext.read.format('com.databricks.spark.csv')        \
              .options(inferSchema=False, delimiter = '\t')              \
              .schema(ratings_df_schema)                                 \
              .load( "file:///Users/manaranjan/Documents/Work/Spark/MyTutorials/movies/data/ratings.dat" )

There is another tutorial here, which does intital exploratory analysis of the movielens dataset.

Drop the timestamp column

In [6]:
ratings_df = raw_ratings_df.drop('Timestamp')
In [7]:
ratings_df.show(10)
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   1193|   5.0|
|     1|    661|   3.0|
|     1|    914|   3.0|
|     1|   3408|   4.0|
|     1|   2355|   5.0|
|     1|   1197|   3.0|
|     1|   1287|   5.0|
|     1|   2804|   5.0|
|     1|    594|   4.0|
|     1|    919|   4.0|
+------+-------+------+
only showing top 10 rows

Read movies data into dataframe

In [8]:
raw_movies_df = sqlContext.read.format('com.databricks.spark.csv')            \
              .options(header=True, inferSchema=False, delimiter = "\t")    \
              .schema(movies_df_schema)                                     \
              .load("file:///Users/manaranjan/Documents/Work/Spark/MyTutorials/movies/data/movies.dat")

movies_df = raw_movies_df.drop('Genres').withColumnRenamed('movieId', 'ID')

Cache both the dataframes

In [9]:
ratings_df.cache()
movies_df.cache()
Out[9]:
DataFrame[ID: int, title: string]
In [10]:
movies_df.show(10, truncate = False )
+---+----------------------------------+
|ID |title                             |
+---+----------------------------------+
|1  |Toy Story (1995)                  |
|2  |Jumanji (1995)                    |
|3  |Grumpier Old Men (1995)           |
|4  |Waiting to Exhale (1995)          |
|5  |Father of the Bride Part II (1995)|
|6  |Heat (1995)                       |
|7  |Sabrina (1995)                    |
|8  |Tom and Huck (1995)               |
|9  |Sudden Death (1995)               |
|10 |GoldenEye (1995)                  |
+---+----------------------------------+
only showing top 10 rows

Splitting the ratings dataset

  • Splitting the dataset into train, validate and test with 60:20:20 proportions.
In [11]:
(training_df, validation_df, test_df) = ratings_df.randomSplit([0.6, 0.2, 0.2], seed = 42)

Let's cache the data splits

In [12]:
training_df.cache()
validation_df.cache()
test_df.cache()
Out[12]:
DataFrame[userId: int, movieId: int, rating: double]

Building the model

In [13]:
from pyspark.ml.recommendation import ALS

Initialize the parameters

In [14]:
MAX_ITERATIONS = 10
REG_PARAM = 0.1
SEED_VALUE = 42

Initialize the ALS() method

In [15]:
als = ALS()
In [16]:
# Now we set the parameters for the method
als.setMaxIter(MAX_ITERATIONS)          \
 .setSeed(SEED_VALUE)                 \
 .setRegParam(REG_PARAM)              \
 .setUserCol('userId')                \
 .setItemCol('movieId')               \
 .setRatingCol('rating')
Out[16]:
ALS_423d8997d18d18e757a1

Define an evaluator

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

Search for the best rank

  • The rank is number of latent factors to be created during matrix factorization.
In [18]:
ranks = [4, 8, 12, 15]
errors = [0, 0, 0, 0]
models = [0, 0, 0, 0]
count = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:

  # Build the model    
  als.setRank(rank)
  model = als.fit(training_df)

  # Make predictions on validation dataset  
  predict_df = model.transform(validation_df)

  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[count] = error
  models[count] = model
  print( 'For rank %s the RMSE is %s' % (rank, error) )

  if error < min_error:
      min_error = error
      best_rank = count
  count += 1

als.setRank(ranks[best_rank])
print( 'The best model was trained with rank %s' % ranks[best_rank] )
For rank 4 the RMSE is 0.8867890644965285
For rank 8 the RMSE is 0.8797676929458298
For rank 12 the RMSE is 0.8755249202840839
For rank 15 the RMSE is 0.8764000006223175
The best model was trained with rank 12

Select the model with best rank

In [24]:
best_model = models[best_rank]

Apply the model on test dataset

In [26]:
test_predict_df = best_model.transform(test_df)
In [27]:
test_predict_df.show(10)
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   673|    148|   5.0| 2.8468165|
|  1242|    148|   3.0| 1.8392601|
|  3539|    148|   3.0| 2.6630893|
|  1605|    148|   2.0| 2.1126497|
|   752|    148|   4.0|  2.807478|
|   424|    148|   4.0|  2.645493|
|  3841|    463|   3.0| 2.7476552|
|  3683|    463|   1.0| 1.4153442|
|   721|    463|   4.0| 3.2623837|
|  5511|    463|   2.0| 3.3263845|
+------+-------+------+----------+
only showing top 10 rows

In [28]:
test_predict_df = test_predict_df.filter(test_predict_df.prediction != float('nan'))

Compute the test accuracy

In [41]:
test_RMSE = reg_eval.evaluate(test_predict_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))
The model had a RMSE on the test set of 0.8749194031094534

The model has an RMSE of 0.875. That means when if it predicts a rating for a movie for an user it will have an error of + or - of 0.87.

Conclusion:

Once the accuracy is acceptable, we can take all the data and build the model and make predictions. To make prediction for a specific user, we can filter out the movies that have not been rated by him or her earlier. And then look at which all movies have been predicted by the model to have a predicted rating beyond a certain cutoff value or sort all the movies by their predicted rating and make top 5 or 10 recommendations.