The New MongoDB Connector for Apache Spark In Action: Building a Movie Recommendation Engine

February 14, 2017 Sam Weaver

Introduction

We are delighted to announce general availability of the new, native MongoDB Connector for Apache Spark. It provides higher performance, greater ease of use, and access to more advanced Spark functionality than other connectors. With certification from Databricks, the company founded by the creators of Apache Spark project, developers can focus on building modern, data driven applications, knowing that the connector provides seamless integration and complete API compatibility between Spark processes and MongoDB.

Written in Scala, Apache Spark’s native language, the Connector provides a more natural development experience for Spark users. The connector exposes all of Spark’s libraries, enabling MongoDB data to be materialized as Dataframes and Datasets for analysis with machine learning, graph, streaming and SQL APIs, further benefiting from automatic schema inference.

The Connector also takes advantage of MongoDB’s aggregation pipeline and rich secondary indexes to extract, filter, and process only the range of data it needs – for example, analyzing all customers located in a specific geography. This is very different from simple NoSQL datastores that do not offer either secondary indexes or in-database aggregations. In these cases, Apache Spark would need to extract all data based on a simple primary key, even if only a subset of that data is required for the Spark process. This means more processing overhead, more hardware, and longer time-to-insight for the analyst.

To maximize performance across large, distributed data sets, the Spark connector is aware of data locality in a MongoDB cluster. RDDs are automatically processed on workers co-located with the associated MongoDB shard to minimize data movement across the cluster. The nearest read preference can be used to route Spark queries to the closest physical node in a MongoDB replica set, thus reducing latency.

“Users are already combining Apache Spark and MongoDB to build sophisticated analytics applications. The new native MongoDB Connector for Apache Spark provides higher performance, greater ease of use, and access to more advanced Apache Spark functionality than any MongoDB connector available today,” 
-- Reynold Xin, co-founder and chief architect of Databricks

To demonstrate how to use the connector, we’ve created a tutorial that uses MongoDB together with Apache Spark’s machine learning libraries to build a movie recommendation system. This example presumes you have familiarity with Spark. If you are new to Spark but would like to learn the basics of using Spark and MongoDB together, we encourage you to check out our new MongoDB University Course.

Getting started

To get started please ensure you have downloaded and installed Apache Spark. Note: this tutorial uses Spark v.1.6 with hadoop.

You will also need to have MongoDB running on localhost listening on the default port (27017). You can follow the documentation to get MongoDB up and running.

The complete code can be found in the github repository.

Ensure you have downloaded the data and imported it with mongorestore. You can find instructions on using mongorestore here.

Tutorial

To illustrate how to use MongoDB with Apache Spark, here is a simple tutorial that uses Spark machine learning to generate a list of movie recommendations for a user. Here is what we will outline in this tutorial:

  1. How to read data from MongoDB into Spark. The data will contain a list of different user ratings of various movies. The data will also contain a list of personal ratings for a handful of movies for a particular user.
  2. Using the machine learning ALS library for Spark, we will generate some personalized recommendations for a particular user based on the movie ratings of other people in the dataset.
  3. Once the recommendations have been generated, we shall save them back to MongoDB.

Ready? Let’s get started!

As Spark plays particularly nicely with Scala, this tutorial will use Scala code snippets. A Python example can be found in the github repository.

Throughout each step of this tutorial we will flesh out the following code template in order to get a working example by the end.

  package example
   
  import org.apache.log4j.{Level, Logger}
  import org.apache.spark.ml.evaluation.RegressionEvaluator
  import org.apache.spark.ml.recommendation.ALS
  import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.{SparkConf, SparkContext}
   
  import com.mongodb.spark.MongoSpark
  import com.mongodb.spark.config.{ReadConfig, WriteConfig}
   
  /**
  * Represents a Users movie rating
  */
  case class UserMovieRating(user_id: Int, movie_id: Int, rating: Double)
   
  object MovieRecommendation {
   
  /**
  * Run this main method to see the output of this quick example or copy the code into the spark shell
  *
  * @param args takes an optional single argument for the connection string
  * @throws Throwable if an operation fails
  */
  def main(args: Array[String]): Unit = {
   
  }
   
  /**
  * Gets or creates the Spark Context
  */
  def getSparkContext(): SparkContext = {
   
  }
  }
view rawmongospark hosted with ❤ by GitHub

1. Setting up Spark

Before we can do any work with Apache Spark we must first set up the Spark environment and assign the SparkContext. The SparkContext represents the connection to a Spark cluster and can be used to create RDD’s and DataFrames. We declare a name for the application and assign how much memory to assign to the worker process. Let’s flesh out the getSparkContext() method first.

  /**
  * Gets or creates the Spark Context
  */
  def getSparkContext(): SparkContext = {
  val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("MovieRatings")
   
  val sc = SparkContext.getOrCreate(conf)
  sc.setCheckpointDir("/tmp/checkpoint/")
  sc
  }
view rawmongospark hosted with ❤ by GitHub

local[*] will run Spark locally with as many worker threads as logical cores on your machine. setCheckpointDir sets a directory under which RDDs are going to be checkpointed should the operations fill up memory and need to spill to disk. We’re building out this example on our laptops, but if you’re running on a cluster the directory must be a valid HDFS path.

2. Setting up reading and writing to MongoDB

We’ll want to also make sure that we are reading data from MongoDB into a DataFrame. A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database. This means that we can do some nice SELECT operations on DataFrames so we can apply a SQLContext to our SparkContext in order to be able to query the DataFrame with SQL.

We’ll also want to make sure that we are saving data back into MongoDB once we are done processing it in Spark.

Our userId 0 is going to be the person for whom we will generate movie recommendations for.

The URI in this example assumes MongoDB is running on localhost (127.0.0.1).

  def main(args: Array[String]): Unit = {
  // Set up configurations
  val sc = getSparkContext()
  val sqlContext = SQLContext.getOrCreate(sc)
   
  val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/movies.movie_ratings?readPreference=primaryPreferred"))
  val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movies.user_recommendations"))
   
  val userId = 0
   
  // Load the movie rating data
  val movieRatings = MongoSpark.load(sc, readConfig).toDF[UserMovieRating]
view rawmongospark hosted with ❤ by GitHub

3. Creating a machine learning model for movie recommendations

We are going to use the ALS (alternating least squares) library for Apache Spark to learn our dataset in order to make predictions for a user. You can learn more about how ALS generates predictions in the Spark documentation.

  // Create the ALS instance and map the movie data
  val als = new ALS()
  .setCheckpointInterval(2)
  .setUserCol("user_id")
  .setItemCol("movie_id")
  .setRatingCol("rating")
view rawmongospark hosted with ❤ by GitHub

We can build a grid of parameters in order to get the most accurate model possible. We’ll probably want to define some variables that we can use to try different permutations during the training:

  // We use a ParamGridBuilder to construct a grid of parameters to search over.
  // TrainValidationSplit will try all combinations of values and determine best model using the ALS evaluator.
  val paramGrid = new ParamGridBuilder()
  .addGrid(als.regParam, Array(0.1, 10.0))
  .addGrid(als.rank, Array(8, 10))
  .addGrid(als.maxIter, Array(10, 20))
  .build()
view rawmongospark hosted with ❤ by GitHub

For training purposes, we must also split our complete data set up into smaller partitions, known as the training, validation and test data. In this case, we can use 80% of the data for training and the rest can be used to validate the model.

  val trainedAndValidatedModel = new TrainValidationSplit()
  .setEstimator(als)
  .setEvaluator(new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction"))
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8)
view rawmongospark hosted with ❤ by GitHub

Once we have our data set split up and we have trained our model, we can explore which model had the best fit for our data:

  // Calculating the best model
  val bestModel = trainedAndValidatedModel.fit(movieRatings)
view rawmongospark hosted with ❤ by GitHub

4. Combine our personal ratings with the rest of the data set

Once we have our model, we will want to use the personal ratings and combine them with the rest of the dataset in order to train a new model based on the complete set:

  // Combine the datasets
  val userRatings = MongoSpark.load(sc, readConfig.copy(collectionName = "personal_ratings")).toDF[UserMovieRating]
  val combinedRatings = movieRatings.unionAll(userRatings)
   
  // Retrain using the combinedRatings
  val combinedModel = als.fit(combinedRatings, bestModel.extractParamMap())
view rawmongospark hosted with ❤ by GitHub

5. Get user recommendations

Now we are ready to generate user recommendations. To get user recommendations, we have to make sure our data set only includes movies that have NOT yet been rated by the user. We also want to make sure that the data set doesn’t contain any duplicates. We create a new DataFrame to hold user recommendations.

  // Get user recommendations
  import sqlContext.implicits._
  val unratedMovies = movieRatings.filter(s"user_id != $userId").select("movie_id").distinct().map(r =>
  (userId, r.getAs[Int]("movie_id"))).toDF("user_id", "movie_id")
  val recommendations = combinedModel.transform(unratedMovies)
   
  // Convert the recommendations into UserMovieRatings
  val userRecommendations = recommendations.map(r =>
  UserMovieRating(0, r.getAs[Int]("movie_id"), r.getAs[Float]("prediction").toInt)).toDF()
view rawmongospark hosted with ❤ by GitHub

6. Save recommendations to MongoDB

Once we have our recommendations generated, it makes sense to save them back into MongoDB for fast lookup in the future:

  // Save to MongoDB
  MongoSpark.save(userRecommendations.write.mode("overwrite"), writeConfig)
view rawmongospark hosted with ❤ by GitHub

7. Don’t forget to clean up

Finally, let’s clean up the Spark context when we are finished with it. If you are running on Databricks you don’t need to do this step.

  sc.stop()
view rawmongospark hosted with ❤ by GitHub

8. Running the code

You can run the code by using the submit-scala.sh script in the github repo which will automatically pull down the connector from the online repository.

  $ ./submit-scala.sh
view rawmongospark hosted with ❤ by GitHub

At the end of the execution you should have a new collection of user recommendations stored into MongoDB:

  > db.personal_ratings.find()
  { "_id" : ObjectId("57226a50a45eff77e4dc3fce"), "user_id" : "0", "movie_id" : "1", "rating" : "4" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fcf"), "user_id" : "0", "movie_id" : "2", "rating" : "4" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd0"), "user_id" : "0", "movie_id" : "16", "rating" : "5" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd1"), "user_id" : "0", "movie_id" : "19", "rating" : "3" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd2"), "user_id" : "0", "movie_id" : "47", "rating" : "4" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd3"), "user_id" : "0", "movie_id" : "70", "rating" : "4" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd4"), "user_id" : "0", "movie_id" : "163", "rating" : "5" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd5"), "user_id" : "0", "movie_id" : "173", "rating" : "1" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd6"), "user_id" : "0", "movie_id" : "356", "rating" : "5" }
  { "_id" : ObjectId("57226a50a45eff77e4dc3fd7"), "user_id" : "0", "movie_id" : "364", "rating" : "5" }
  >
view rawmongospark hosted with ❤ by GitHub

That’s it! You just created a program that gets and stores data with MongoDB, processes it in Spark and creates intelligent recommendations for users.

Ready to get started?

 

About the Author - Sam Weaver

Sam is the Product Manager for Developer Experience at MongoDB based in New York. Prior to MongoDB, he worked at Red Hat doing technical presales on Linux, Virtualisation and Middleware. Originally from Cheltenham, England; he received his Bachelors in Computer Science from Cardiff University. Sam has also cycled from London to Paris, competed in several extreme sports tournaments such as ToughMudder, and swam with great white sharks.

Previous Article
Part 4: Building a Client UI Using Angular 2 (formerly AngularJS) & TypeScript
Part 4: Building a Client UI Using Angular 2 (formerly AngularJS) & TypeScript

This is the fourth in a series of blog posts examining technologies such as Angular that are driving the de...

Next Article
MongoDB Microservices Help HM Revenue & Customs Cut Wait Times in Half and Save £8.2m in Operational Costs
MongoDB Microservices Help HM Revenue & Customs Cut Wait Times in Half and Save £8.2m in Operational Costs