[Activity] Running the Similar Movies Script using Spark's Cluster Manager
A free video tutorial from Sundog Education by Frank Kane
Founder, Sundog Education. Machine Learning Pro
4.5 instructor rating • 19 courses • 407,193 students
We'll run our movie similarties script and see the results.
Learn more from the full courseApache Spark with Scala - Hands On with Big Data!
Dive right in with 20+ hands-on examples of analyzing large data sets with Apache Spark, on your desktop or on Hadoop!
09:03:23 of on-demand video • Updated September 2020
- Frame big data analysis problems as Apache Spark scripts
- Develop distributed code using the Scala programming language
- Optimize Spark jobs through partitioning, caching, and other techniques
- Build, deploy, and run Spark scripts on Hadoop clusters
- Process continual streams of data with Spark Streaming
- Transform structured data using SparkSQL, DataSets, and DataFrames
- Traverse and analyze graph structures using GraphX
- Analyze massive data set with Machine Learning on Spark
English Instructor: All right, let' jump into the code here for computing MovieSimilarities using Spark. So, open up the MovieSimilaritiesDataset script here. And this is a case where using a dataset will outperform using an RDD. The really most complicated part of this whole algorithm is that self-join operation where we try to find every unique pair of movies that were rated by the same person, and that is a very SQL kinda thing to do, right? A self-join operation. So, this actually fits well into the ideas of using a dataset in SparkSQL. It is possible to do this is RDD's as well, but in this case, datasets do outperform RDD's, which is usually the case. So, we're gonna focus on datasets for this one. If you are morbidly curious about how to do this with RDD's, there's a MovieSimilarities script as well here available that just does it using RDD's instead. But let's focus on datasets. So, what do we have here? Well, we start off by importing all the stuff we need, as usual and we declare a MovieSimilaritiesDataset object that contains several case classes used to find the various file formats that we're going to read in and use for the datasets that we use along the way throughout our algorithm. So, we have a movies dataset that is just mirroring the format of the raw data that we're reading in. It consists of a userID integer, a movieID integer, a rating integer, and a long timestamp. We're also going to load up a movieNames dataset so we can very easily display the human-readable names of the given movieID. That's just going to map movieIDs to movie titles. We will also have a MoviePairs dataset along the way and this embodies a unique pair of movies. This is sort of the output of that self-join operation where we get every set of movie that was rated by the same person, and that will consist of both the movie1 and movie2 movieID's and the ratings associated with each of those movies from a given user. So a MoviePairs row will consist of a pair of movies and their ratings by a single user who watched them both. Got it? And then we'll also have a MoviePairsSimilarity dataset at some point that contains a pair of movies, movie1 and movie2, a similarities score in double-precision format, and the number of pairs that were associated in computing, that similarities score. So that's going to reflect how many users actually watched this pair of movies and rated them together. Let's skip ahead to the main function and work from there. We'll start by setting our log level and creating a Spark session object, as usual. We then create a movieNames schema, which will be used later on for structuring our dataset to look up movieID's to movie titles. Just as our movieID will be an integer, and it will have a movie title for every movieID that is a string. We will then declare a schema for the u.data, the movie ratings data itself as well. That's going to have userID, movieID, rating, and timestamp in the structure that we've seen before. We'll then load up those movieNames as our first step. We import spark.implicits because we're loading up a dataframe initially and then converting that to a dataset. So to implicitly infer the schema of a dataframe, even though we're providing you with one, you still need to use spark.implicits for that step. So we tell it that particular data file, u.item is separated by pipe deloaders, it is in the ISO-8859-1 character set and it uses the movieNames schema that we're exclusively providing. Because there is no header row in this file that we can use to infer it from. And when we're done with that, we will explicitly convert it to a datset using the movieNames case class for even better performance under Spark. You then load up the movie ratings data itself, again, as a dataset. Same idea here. This one's actually tab separated using the movies schema that we defined above and we will convert that as well to a dataset using the movies case class. So at this point, we have a movieNames dataset and we have a movies dataset that represents all the individual user ratings. We will then select out the information we care about just to prevent us from carrying around information we don't need. So our ratings dataset will now be just a userID, movieID, and rating, leaving aside the timestamp comp because we don't actually need that information for what we're doing here. Now, here's where it gets interesting. This is that self-join operation we talked about before. So we're gonna have a MoviePairs dataset. Its intent here is to contain every pair of movies rated by the same person. So the way we're doing that is first of all, we're gonna take our ratings dataset, we're gonna give it an alias of ratings1 so we can refer to it easily, and we will join that with another copy of the ratings dataset that's referred to as ratings2. So we're going to do a join between ratings1 and ratings2 which are really the same thing. They're both going to the ratings dataset. So that's why we call this a self-join. We're joining the ratings dataset on itself. And we will join it based on the userID column, okay? So we're saying that we're going to join only if the userID in ratings1 matches the userId in ratings2. So this'll have the effect of pairing together all the movies that are rated by the same person. Got it? You kinda have to understand how join operations work in SQL, and this isn't really a SQL class, but the upshot is that's what you get out of this expression here. Every pair of movies that were rated by the same person. And furthermore, while we're at it, we're going to enforce that the movieID for ratings1 will be less than the movieID coming from ratings2, and we're doing this just to prevent duplicates. So again, we don't wanna have a separate entry for ratings1 paired with ratings2 as we would for ratings2 with ratings1. So by doing this, we just make sure that we're only capturing one unique pairing there. Once we have that, we can then rename things to make it easier to use. So we will rename ratings1.movieID to movie1, ratings2.movieId to movie2, ratings1.rating to rating1, and ratings2.rating to rating2, again, just to make it easier to work with. And furthermore, we will explicitly make this a dataset using the moviePairs case class that we defined above. Now, we have at our disposal here the MoviePairs dataset that contains pairs of movies and their ratings for every unique user that rated that pair of movies. Given that, we can now call computeCosineSimilarity to construct our moviePairSimilarities dataset that will contain, for every pair of movies, how similar they are to each other based on all the users that rated those movies together. So, before we go further and talk about that cache operation let's see what computeCosineSimilarity does. That's up here somewhere. There it is. So, you know, there's a little bit of fancy math going on here. I don't really want to get into the cosine similarity metric itself too much because that's not really anything to do with Spark programming. Again, if you wanna learn more about that, you can check out my course on recommender systems. But at a high level, we're creating three new columns here, xx, yy, and xy, that's gonna compute x squared, y squared, and xy from the algorithm that we use for computing cosine similarity. Again, it's just basically an angle between two virtual vectors in this user movie space. We can then calculate that similarity scores thusly. Basically we use that new dataset called pairScores that has those extra terms that's appended to it and then we call agg to aggregate together all the entries for every given movie pair there using the following expression. So for all of the MoviePairs, for all the users that rated those two movies together, we're gonna go across them all with that agg function and we will sum up the xy columns and call that the numerator. That's just gonna be the numerator of our expression for computing cosine similarity. And then, for the denominator, we're gonna give that the alias denominator, that will end up being the square root of the sum of xx column and the square root of the sum of the yy column added together. And finally, we will have the numPairs, which is just going to count up how many of the xy column exists. That's just a shortcut for figuring out how many users actually rated this pair of movies together. And that's the information that we need to actually compute the actual similarity score, which is what's gonna land in this result dataset here. We just add a new score column to that as well, and what we do is we first make sure that we're not gonna be dividing by zero. We check explicitly to make sure the denominator is not zero, otherwise we just have a null result there. If not, then we just divide the numerator by the denominator and at this point we have our actual similarity score computed based on the cosine similarity metric. We then just whittle this down to the columns that we care about, which is gonna be movie1 and movie2, the similarity score between them, and the number of pairs supporting that score. And we will force that into a dataset as well using the MoviePairsSimilarity case class above and return the result dataset, which again, at this point just contains movie1, movie2, the score, and numPairs. Whoo, all right. Back to where that was called. So we now have this dataset of all the similarity scores between every unique pair of movies and we're probably gonna use that more than once. So let's go ahead and cache that so that we'll have that in memory and handy to go no matter what we're gonna do with it later on. Not only are we gonna use this to display our results, but you could imagine we could actually make an actual item-based collaborative filtering system out of this by keeping that moviePairSimilarities dataset in memory. We could then take the set of everything that any new user has liked or expressed interest in or rated highly, whatever you wanna use as an indication of interest, and then hit that moviePairSimilarities dataset to very quickly get back all the similar movies to the movies that person liked. So again, this would be very important to cache if you were building a real recommendor system here. But all we're going to do here is just try to get the results of the top similarities for a given movie. So we're gonna check that we passed in an argument here. So the idea here is that we're going to pass in as an argument to this script a movieId that we're interested in seeing all the similarities to. And furthermore, we're going to set some thresholds here. So we're going to say that unless there's a 97% similarity score between two movies, we're not gonna consider it similar enough to be interesting. And we will also say that you need to have at least 50 users in common that rated most of these things together. So that's sort of the minimum support that we need to have confidence that this is a reliable result. You don't wanna be making your recommendation based on what two people said, right? Ideally you want many people that agree with each other to give you a better result. So these thresholds end up being important in getting quality results. And they're rather arbitrary, but we'll get back to that. So, we'll apply those filters here. We will filter the moviePairSimilarities dataset and filter it, not just to enforce those score thresholds, making sure that the score is greater than our scoreThreshold and numPairs is greater than coOccurrenceThreshold, but we're also going to filter it down from the movieId that we're interested in. So for the movie that we passed in as an argument, that's the movie that we wanna see similar movies to. So we enforce that movie1 is equal to movieId or movie2 is equal to movieId. We don't really know if it's gonna be on the movie1 or the movie2 side. It could be either depending on the order of the movieIds right? So if either of the movies in the movie pair is the one that we're interested in, we're gonna let that through and we'll also further check that our thresholds for quality are met as well. Once we have that, we will sort that descending based on that score column to get the most similar movies to that movie and take the top 10. So we would call this a top 10 recommendor in recommender system parlance because we're taking the top 10 results and making that our recommendations for this movie. And then we print it out. We just say get the top 10 similar movies for whatever that movie name is to make that human-readable, again, using that movieNames dataset that we loaded up a long time ago for that movieID and for every unique result that we get back, you gotta read through it and we extract a similar movie based on whichever movieID is not the one that we passed in as a parameter and print out that result along with its score and the strength based on the number of pairs that supported that score. Whoo. So hey, it looks like this should work. You know, it was a lot to talk about but if you look at the code, for all that it does, it's really not that much code, right? It's not really that bad. I mean, there's some funky stuff going on for sure that you need to wrap your head around with these more complicated expressions here, like this aggregate here or the self-join operation. But once you get past that, it's not that much code. So let's go ahead and run it and see what happens. Before we run it though, we need to pass that parameter of what movieID we want to get back, right? So to do that, I'll show you that little trick here in IntelliJ, right click on movie similarities and we're gonna say create MovieSimilarities. This will create a run configuration that we can set up explicitly and it has a slot here for program argument. So you put in whatever movieID you want here for any move you're interested in. Let's say 56, whatever that happens to be and we'll say okay. And now up here, we have the MovieSimilarities run configuration that we just defined and we can hit the play button to run it. So let's kick that off and see what happens. Off it goes. We loaded our movieNames and now it's off actually computing those similarities. And there we have our results. So that's pretty good. That was a reasonably small amount of time for a very complicated operation. Doing a self-join on a big dataset is no small feat, mind you. And yeah, we got back the top results for Pulp Fiction. It turns out that's what movieID 56 is. It's kind of a gritty movie. And that came back with more gritty movies so it actually seemed to work. I've never seen Smoke before. I don't actually like Pulp Fiction, so I'm not sure I'd actually wanna go watch that myself. But Reservoir Dogs, Donnie Brasco, True Romance, these are all reasonable results for similar movies to Pulp Fiction based on other user ratings. So there you have it. If you were going to build a people who liked this movie also liked on Netflix or something like that, you now know how to do that, and you could actually scale that up using Apache Spark to handle a massive amount of ratings or a massive number of movies because you can actually throw a whole cluster at this now. So there you have it, an example of doing MovieSimilarities and item-based collaborative filtering, at least the first half of it using Apache Spark. And as you'll see later on, there's actually a built-in function in the machine learning library for Apache Spark that does something similar, but it doesn't actually generate results that good with the movie lens datasets. So sometimes just using the off-the-shelf tools isn't good enough and you need to go back and sort of be inventive and implement new algorithms using Spark that maybe haven't been seen before in Spark. Again, that's what they're gonna pay you the big bucks for guys. That's a good example and we're gonna wrap up on that one. Before we do though, I'm gonna challenge you to actually make this better. So let's talk about that in our next lecture.