Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3066

Support recommendAll in matrix factorization model

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.4.0
    • Component/s: MLlib
    • Labels:
      None
    • Target Version/s:

      Description

      ALS returns a matrix factorization model, which we can use to predict ratings for individual queries as well as small batches. In practice, users may want to compute top-k recommendations offline for all users. It is very expensive but a common problem. We can do some optimization like

      1) collect one side (either user or product) and broadcast it as a matrix
      2) use level-3 BLAS to compute inner products
      3) use Utils.takeOrdered to find top-k

        Issue Links

          Activity

          Hide
          debasish83 Debasish Das added a comment -

          Xiangrui Meng I am testing recommendAllUsers and recommendAllProducts API and I will add the code to RankingMetrics PR:
          https://github.com/apache/spark/pull/3098

          I have not used level-3 BLAS yet since we should be able to re-use DistributedMatrix API that's coming online (here all the matrices are dense)...I used ideas 1 and 2 and I also add a skipRatings in the API (using that you can skip the ratings that each user has already provided...for the validation I skip the train set basically)

          Example API:

          def recommendAllUsers(num: Int, skipUserRatings: RDD[Rating]) = {
          val skipUsers = skipUserRatings.map

          { x => ((x.user, x.product), x.rating) }

          val productVectors = productFeatures.collect
          recommend(productVectors, userFeatures, num, skipUsers)
          }

          def recommendAllProducts(num: Int, skipProductRatings: RDD[Rating]) = {
          val skipProducts = skipProductRatings.map

          { x => ((x.product, x.user), x.rating) }

          val userVectors = userFeatures.collect
          recommend(userVectors, productFeatures, num, skipProducts)
          }

          Show
          debasish83 Debasish Das added a comment - Xiangrui Meng I am testing recommendAllUsers and recommendAllProducts API and I will add the code to RankingMetrics PR: https://github.com/apache/spark/pull/3098 I have not used level-3 BLAS yet since we should be able to re-use DistributedMatrix API that's coming online (here all the matrices are dense)...I used ideas 1 and 2 and I also add a skipRatings in the API (using that you can skip the ratings that each user has already provided...for the validation I skip the train set basically) Example API: def recommendAllUsers(num: Int, skipUserRatings: RDD [Rating] ) = { val skipUsers = skipUserRatings.map { x => ((x.user, x.product), x.rating) } val productVectors = productFeatures.collect recommend(productVectors, userFeatures, num, skipUsers) } def recommendAllProducts(num: Int, skipProductRatings: RDD [Rating] ) = { val skipProducts = skipProductRatings.map { x => ((x.product, x.user), x.rating) } val userVectors = userFeatures.collect recommend(userVectors, productFeatures, num, skipProducts) }
          Hide
          apachespark Apache Spark added a comment -

          User 'debasish83' has created a pull request for this issue:
          https://github.com/apache/spark/pull/3098

          Show
          apachespark Apache Spark added a comment - User 'debasish83' has created a pull request for this issue: https://github.com/apache/spark/pull/3098
          Hide
          debasish83 Debasish Das added a comment -

          On our internal datasets, flatMap is slow...I am changing the code to have 2 methods (assuming users are tall and products are skinny)...if user and product are tall and wide then we need to rethink

          recommendAllUsers: takeOrdered is called on each userFeature dot productFeatures

          recommendAllProducts: mapPartitions will emit Iterator(productId, userPriorityQueue) and reduceByKey will generate the topK users for each product..

          Show
          debasish83 Debasish Das added a comment - On our internal datasets, flatMap is slow...I am changing the code to have 2 methods (assuming users are tall and products are skinny)...if user and product are tall and wide then we need to rethink recommendAllUsers: takeOrdered is called on each userFeature dot productFeatures recommendAllProducts: mapPartitions will emit Iterator(productId, userPriorityQueue) and reduceByKey will generate the topK users for each product..
          Hide
          debasish83 Debasish Das added a comment - - edited

          Xiangrui Meng as per our discussions, I added APIs for batch user and product recommendation and MAP computation for recommending topK products for users...

          Note that I don't use reservoir sampling and used your idea of filtering the test set users for which there are no model being built...I thought reservoir sampling should be part of a separate PR

          APIs added:

          recommendProductsForUsers(num: Int) : topK is fixed for all users
          recommendProductsForUsers(userTopK: RDD[(Int, Int)]): variable topK for every user

          recommendUsersForProducts(num: Int): topK is fixed for all products
          recommendUsersForProducts(productTopK: RDD[(Int, Int)]): variable topK for every product

          I used mllib BLAS for all the computation and cleaned up DoubleMatrix code from MatrixFactorizationModel...I have not used level 3 BLAS yet...I can add that as well if rest of the flow makes sense...

          On examples.MovieLensALS we can activate the user map calculation using --validateRecommendation flag:

          ./bin/spark-submit --master spark://localhost:7077 --jars scopt_2.10-3.2.0.jar --total-executor-cores 4 --executor-memory 4g --driver-memory 1g --class org.apache.spark.examples.mllib.MovieLensALS ./examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar --kryo --lambda 0.065 --validateRecommendation hdfs://localhost:8020/sandbox/movielens/

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 799617, test: 200592.
          Test RMSE = 0.8495476608536306.
          Test users 6032 MAP 0.03798337814233403

          I will run the netflix dataset and report the MAP measures for that..

          On our internal datasets, I have tested for 1M users, 10K products, 120 cores, 240GB for topK users for each product and that takes around 5 mins...on an average I generate a ranked list of 6000 users for each product...Basically internally we are using the batch API:

          recommendUsersForProducts(productTopK: RDD[(Int, Int)]): variable topK for every product

          Show
          debasish83 Debasish Das added a comment - - edited Xiangrui Meng as per our discussions, I added APIs for batch user and product recommendation and MAP computation for recommending topK products for users... Note that I don't use reservoir sampling and used your idea of filtering the test set users for which there are no model being built...I thought reservoir sampling should be part of a separate PR APIs added: recommendProductsForUsers(num: Int) : topK is fixed for all users recommendProductsForUsers(userTopK: RDD [(Int, Int)] ): variable topK for every user recommendUsersForProducts(num: Int): topK is fixed for all products recommendUsersForProducts(productTopK: RDD [(Int, Int)] ): variable topK for every product I used mllib BLAS for all the computation and cleaned up DoubleMatrix code from MatrixFactorizationModel...I have not used level 3 BLAS yet...I can add that as well if rest of the flow makes sense... On examples.MovieLensALS we can activate the user map calculation using --validateRecommendation flag: ./bin/spark-submit --master spark://localhost:7077 --jars scopt_2.10-3.2.0.jar --total-executor-cores 4 --executor-memory 4g --driver-memory 1g --class org.apache.spark.examples.mllib.MovieLensALS ./examples/target/spark-examples_2.10-1.3.0-SNAPSHOT.jar --kryo --lambda 0.065 --validateRecommendation hdfs://localhost:8020/sandbox/movielens/ Got 1000209 ratings from 6040 users on 3706 movies. Training: 799617, test: 200592. Test RMSE = 0.8495476608536306. Test users 6032 MAP 0.03798337814233403 I will run the netflix dataset and report the MAP measures for that.. On our internal datasets, I have tested for 1M users, 10K products, 120 cores, 240GB for topK users for each product and that takes around 5 mins...on an average I generate a ranked list of 6000 users for each product...Basically internally we are using the batch API: recommendUsersForProducts(productTopK: RDD [(Int, Int)] ): variable topK for every product
          Hide
          debasish83 Debasish Das added a comment -

          I did experiments on MovieLens dataset with varying rank on my localhost spark with 4 GB RAM and 4 cores to see how much MAP improvement we see as the rank is scaled

          ===
          rank=10 (default)

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 799747, test: 200462.
          Test RMSE = 0.8528377625407709.
          Test users 6036 MAP 0.03851426277536059

          Runtime: 30s

          ===
          rank=25

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 800417, test: 199792.
          Test RMSE = 0.8518001349769724.
          Test users 6037 MAP 0.04508057348514959

          Runtime: 30 s

          ===
          rank=50

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 800823, test: 199386.
          Test RMSE = 0.8487416471685229.
          Test users 6038 MAP 0.05145126538369158

          Runtime 42s

          ===
          rank=100

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 800720, test: 199489.
          Test RMSE = 0.8508095863317275.
          Test users 6033 MAP 0.0561225429735388

          Runtime 1.5m

          ===
          rank=150

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 800257, test: 199952.
          Test RMSE = 0.8435902056186158.
          Test users 6035 MAP 0.05855252471878818

          Runtime 3.6 m

          ===
          rank=200

          Got 1000209 ratings from 6040 users on 3706 movies.
          Training: 800356, test: 199853.
          Test RMSE = 0.8452385688272382.
          Test users 6037 MAP 0.059176892052172934

          Runtime 7.4 mins

          I will run through MovieLens10m and Netflix dataset and generate the numbers of them with varying ranks as well. I need to run them on cluster.

          Show
          debasish83 Debasish Das added a comment - I did experiments on MovieLens dataset with varying rank on my localhost spark with 4 GB RAM and 4 cores to see how much MAP improvement we see as the rank is scaled === rank=10 (default) Got 1000209 ratings from 6040 users on 3706 movies. Training: 799747, test: 200462. Test RMSE = 0.8528377625407709. Test users 6036 MAP 0.03851426277536059 Runtime: 30s === rank=25 Got 1000209 ratings from 6040 users on 3706 movies. Training: 800417, test: 199792. Test RMSE = 0.8518001349769724. Test users 6037 MAP 0.04508057348514959 Runtime: 30 s === rank=50 Got 1000209 ratings from 6040 users on 3706 movies. Training: 800823, test: 199386. Test RMSE = 0.8487416471685229. Test users 6038 MAP 0.05145126538369158 Runtime 42s === rank=100 Got 1000209 ratings from 6040 users on 3706 movies. Training: 800720, test: 199489. Test RMSE = 0.8508095863317275. Test users 6033 MAP 0.0561225429735388 Runtime 1.5m === rank=150 Got 1000209 ratings from 6040 users on 3706 movies. Training: 800257, test: 199952. Test RMSE = 0.8435902056186158. Test users 6035 MAP 0.05855252471878818 Runtime 3.6 m === rank=200 Got 1000209 ratings from 6040 users on 3706 movies. Training: 800356, test: 199853. Test RMSE = 0.8452385688272382. Test users 6037 MAP 0.059176892052172934 Runtime 7.4 mins I will run through MovieLens10m and Netflix dataset and generate the numbers of them with varying ranks as well. I need to run them on cluster.
          Hide
          josephkb Joseph K. Bradley added a comment -

          Are there approximate methods which would be faster? On single machines, there are data structures for finding approximate nearest neighbors quickly. I'm not sure about distributed data structures.

          Show
          josephkb Joseph K. Bradley added a comment - Are there approximate methods which would be faster? On single machines, there are data structures for finding approximate nearest neighbors quickly. I'm not sure about distributed data structures.
          Hide
          debasish83 Debasish Das added a comment -

          Joseph K. Bradley do you mean knn ? For recommendation until you do the dot product I am not sure how can you find topk..level 3 BLAS will definite give a big boost since it's all blocked dense with dense multiplication...For https://issues.apache.org/jira/browse/SPARK-4823 I am looking into dense dense BLAS and dense sparse BLAS..ideally there we can add in a knn based optimization followed by row similarity calculation

          Show
          debasish83 Debasish Das added a comment - Joseph K. Bradley do you mean knn ? For recommendation until you do the dot product I am not sure how can you find topk..level 3 BLAS will definite give a big boost since it's all blocked dense with dense multiplication...For https://issues.apache.org/jira/browse/SPARK-4823 I am looking into dense dense BLAS and dense sparse BLAS..ideally there we can add in a knn based optimization followed by row similarity calculation
          Hide
          josephkb Joseph K. Bradley added a comment -

          It's similar, I believe, for ALS. The cosine similarity metric you get with the dot product for ALS is a distance metric, right? So finding the top K products to recommend a given user is essentially the same as finding the K product feature vectors which are closest to the user's feature vector. This optimization could be used both for recommending for a single user and for recommendAll.

          I'm not sure about how effective these approximate nearest neighbor methods are. My understanding is that they work reasonable well as long as the feature space is fairly low-dimensional, which should often be the case for ALS.

          My hope is that these approximate nearest neighbor data structures can reduce communication. The ones I've seen are based on feature space partitioning, which could potentially allow you to figure out a subset of product partitions to check for each user.

          Using level 3 BLAS might be better; I'm really not sure. It won't reduce communication, though. These 2 types of optimizations might be orthogonal, anyways.

          Show
          josephkb Joseph K. Bradley added a comment - It's similar, I believe, for ALS. The cosine similarity metric you get with the dot product for ALS is a distance metric, right? So finding the top K products to recommend a given user is essentially the same as finding the K product feature vectors which are closest to the user's feature vector. This optimization could be used both for recommending for a single user and for recommendAll. I'm not sure about how effective these approximate nearest neighbor methods are. My understanding is that they work reasonable well as long as the feature space is fairly low-dimensional, which should often be the case for ALS. My hope is that these approximate nearest neighbor data structures can reduce communication. The ones I've seen are based on feature space partitioning, which could potentially allow you to figure out a subset of product partitions to check for each user. Using level 3 BLAS might be better; I'm really not sure. It won't reduce communication, though. These 2 types of optimizations might be orthogonal, anyways.
          Hide
          srowen Sean Owen added a comment -

          The top-k recs problem is not quite a nearest neighbor problem. Dot product isn't cosine similarity and cosine distance even isn't a distance metric. Yes, the way forward I know of is LSH to reduce the space of candidates to consider.

          Show
          srowen Sean Owen added a comment - The top-k recs problem is not quite a nearest neighbor problem. Dot product isn't cosine similarity and cosine distance even isn't a distance metric. Yes, the way forward I know of is LSH to reduce the space of candidates to consider.
          Hide
          josephkb Joseph K. Bradley added a comment -

          Oops, true, not an actual metric. LSH sounds reasonable. Do you know of use cases or how well it's been found to work for recommendation problems?

          Show
          josephkb Joseph K. Bradley added a comment - Oops, true, not an actual metric. LSH sounds reasonable. Do you know of use cases or how well it's been found to work for recommendation problems?
          Hide
          srowen Sean Owen added a comment -

          My anecdotal experience with it was that getting an order-of-magnitude speedup meant losing a small but noticeable amount of quality in the top recommendations. That is, you would fail to consider as candidates some of the items that were actually top recs.

          The most actionable test / implementation I have to show this for ALS is ... https://github.com/cloudera/oryx/blob/master/als-common/src/it/java/com/cloudera/oryx/als/common/candidate/LocationSensitiveHashIT.java This could let you run tests for a certain scale, certain degree of hashing, etc., if you wanted to.

          I've actually tried to avoid needing LSH just for speed in order to avoid this tradeoff.

          Anyway for papers? I found this pretty complex treatment:
          http://papers.nips.cc/paper/5329-asymmetric-lsh-alsh-for-sublinear-time-maximum-inner-product-search-mips.pdf

          This has a little info on the quality of LSH:
          https://fruct.org/sites/default/files/files/conference15/Ponomarev_LSH_P2P.pdf

          It's one of those things where I'm sure it can be done better than the basic ways I know to do it, but haven't yet found a killer paper.

          Show
          srowen Sean Owen added a comment - My anecdotal experience with it was that getting an order-of-magnitude speedup meant losing a small but noticeable amount of quality in the top recommendations. That is, you would fail to consider as candidates some of the items that were actually top recs. The most actionable test / implementation I have to show this for ALS is ... https://github.com/cloudera/oryx/blob/master/als-common/src/it/java/com/cloudera/oryx/als/common/candidate/LocationSensitiveHashIT.java This could let you run tests for a certain scale, certain degree of hashing, etc., if you wanted to. I've actually tried to avoid needing LSH just for speed in order to avoid this tradeoff. Anyway for papers? I found this pretty complex treatment: http://papers.nips.cc/paper/5329-asymmetric-lsh-alsh-for-sublinear-time-maximum-inner-product-search-mips.pdf This has a little info on the quality of LSH: https://fruct.org/sites/default/files/files/conference15/Ponomarev_LSH_P2P.pdf It's one of those things where I'm sure it can be done better than the basic ways I know to do it, but haven't yet found a killer paper.
          Hide
          josephkb Joseph K. Bradley added a comment -

          Thanks for the references! I'll take a look, but based on what you say, perhaps focusing on BLAS is the best path for now.

          Show
          josephkb Joseph K. Bradley added a comment - Thanks for the references! I'll take a look, but based on what you say, perhaps focusing on BLAS is the best path for now.
          Hide
          debasish83 Debasish Das added a comment -

          We use the non-level 3 BLAS code in our internal flows with ~ 60M x 3M datasets...Runtime is decent...I am moving to level 3 BLAS for 4823 and I think the speed will improve further....

          Show
          debasish83 Debasish Das added a comment - We use the non-level 3 BLAS code in our internal flows with ~ 60M x 3M datasets...Runtime is decent...I am moving to level 3 BLAS for 4823 and I think the speed will improve further....
          Hide
          debasish83 Debasish Das added a comment - - edited

          Also unless the raw flow runs there is no way to validate how good a LSH based flow is doing...I updated the PR today with Xiangrui Meng reviews...I am working on level 3 BLAS routines for item->item similarity calculation from matrix factors and the same optimization can be applied here...I will open up the PR for that in coming weeks...we already have a JIRA for rowSimilarities...

          Show
          debasish83 Debasish Das added a comment - - edited Also unless the raw flow runs there is no way to validate how good a LSH based flow is doing...I updated the PR today with Xiangrui Meng reviews...I am working on level 3 BLAS routines for item->item similarity calculation from matrix factors and the same optimization can be applied here...I will open up the PR for that in coming weeks...we already have a JIRA for rowSimilarities...
          Hide
          apachespark Apache Spark added a comment -

          User 'mengxr' has created a pull request for this issue:
          https://github.com/apache/spark/pull/5829

          Show
          apachespark Apache Spark added a comment - User 'mengxr' has created a pull request for this issue: https://github.com/apache/spark/pull/5829
          Hide
          mengxr Xiangrui Meng added a comment -

          Issue resolved by pull request 5829
          https://github.com/apache/spark/pull/5829

          Show
          mengxr Xiangrui Meng added a comment - Issue resolved by pull request 5829 https://github.com/apache/spark/pull/5829

            People

            • Assignee:
              debasish83 Debasish Das
              Reporter:
              mengxr Xiangrui Meng
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development