Details

      Description

      Create a version of Cooccurrence Analysis (RowSimilarityJob with LLR) that runs on Spark. This should be compatible with Mahout Spark DRM DSL so a DRM can be used as input.

      Ideally this would extend to cover MAHOUT-1422. This cross-cooccurrence has several applications including cross-action recommendations.

      1. MAHOUT-1464.patch
        8 kB
        Sebastian Schelter
      2. MAHOUT-1464.patch
        8 kB
        Sebastian Schelter
      3. MAHOUT-1464.patch
        8 kB
        Sebastian Schelter
      4. MAHOUT-1464.patch
        18 kB
        Sebastian Schelter
      5. MAHOUT-1464.patch
        18 kB
        Sebastian Schelter
      6. MAHOUT-1464.patch
        16 kB
        Sebastian Schelter
      7. run-spark-xrsj.sh
        1 kB
        Pat Ferrel

        Activity

        Hide
        ssc Sebastian Schelter added a comment -

        I've started to work on this.

        Show
        ssc Sebastian Schelter added a comment - I've started to work on this.
        Hide
        pferrel Pat Ferrel added a comment - - edited

        Good news. At the risk of asking for too much, what are your thoughts on the XRSJ, MAHOUT-1422? Speaking for myself I'd rather have MAHOUT-1464 than nothing.

        Also for testing would you recommend Hadoop 2 for Spark? it seems to be their recommended setup but I have 1.2.1 now.

        Show
        pferrel Pat Ferrel added a comment - - edited Good news. At the risk of asking for too much, what are your thoughts on the XRSJ, MAHOUT-1422 ? Speaking for myself I'd rather have MAHOUT-1464 than nothing. Also for testing would you recommend Hadoop 2 for Spark? it seems to be their recommended setup but I have 1.2.1 now.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        I only ever ran spark code with hdfs cluster of cdh 4. Mapreduce api is irrelevant, which is where most of 2.0 vs 1 thing happens, only hdfs is, since spark doesnt need mr cluster. Spark can also run under yarn supervision, which would imply 2.0, but i would strongly recommend against it and use mesos plus zookeeper.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - I only ever ran spark code with hdfs cluster of cdh 4. Mapreduce api is irrelevant, which is where most of 2.0 vs 1 thing happens, only hdfs is, since spark doesnt need mr cluster. Spark can also run under yarn supervision, which would imply 2.0, but i would strongly recommend against it and use mesos plus zookeeper.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Ps spark module has cdh4 maven profile.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Ps spark module has cdh4 maven profile.
        Hide
        ssc Sebastian Schelter added a comment -

        @Pat I'm pretty busy with non-Mahout stuff until end of April, not sure how far I can get with this until then, unfortunately.

        Show
        ssc Sebastian Schelter added a comment - @Pat I'm pretty busy with non-Mahout stuff until end of April, not sure how far I can get with this until then, unfortunately.
        Hide
        pferrel Pat Ferrel added a comment -

        So am I so no problem.

        My plan is to update the Solr-recommender contrib with Spark for calculation of the indicator/similarity matrix. For the one action recommender this only needs RSJ, for the two action recommender it means matrix transpose and multiply OR XRSJ. The PreparePreferenceMatrixJob and its analogy, PrepareActionMatricesJob, will stay in plain old hadoop for now. Not sure there is much benefit moving a dataflow process to Spark

        Show
        pferrel Pat Ferrel added a comment - So am I so no problem. My plan is to update the Solr-recommender contrib with Spark for calculation of the indicator/similarity matrix. For the one action recommender this only needs RSJ, for the two action recommender it means matrix transpose and multiply OR XRSJ. The PreparePreferenceMatrixJob and its analogy, PrepareActionMatricesJob, will stay in plain old hadoop for now. Not sure there is much benefit moving a dataflow process to Spark
        Hide
        ssc Sebastian Schelter added a comment -

        I havent tested Spark on Hadoop yet, I ran it standalone in the cluster and had it read data from HDFS.

        Show
        ssc Sebastian Schelter added a comment - I havent tested Spark on Hadoop yet, I ran it standalone in the cluster and had it read data from HDFS.
        Hide
        ssc Sebastian Schelter added a comment - - edited

        I'd like to rework my prototype from directly using Spark to using Dmitriy's DSL, but there's a few operators and mechanics that I need to add.

        Show
        ssc Sebastian Schelter added a comment - - edited I'd like to rework my prototype from directly using Spark to using Dmitriy's DSL, but there's a few operators and mechanics that I need to add.
        Hide
        pferrel Pat Ferrel added a comment -

        OK, refreshed the repo and now I see all the Spark/Scala stuff.

        Not sure what you mean by "standalone in the cluster"? Just getting up to speed on Spark and they describe integrating with hadoop. I was asking because I need to set up a clustered environment and really only have one cluster so Spark and Hadoop will coexist on the the same machines. I'll probably stay on Hadoop 1.2.1 as Dimitriy suggests.

        So you plan to add to D's PDF doc? If so maybe it would be good to check it in or add it to the Github wiki or cwiki. Even if it's only a link to the PDF we'd all know where to look for the latest.

        Show
        pferrel Pat Ferrel added a comment - OK, refreshed the repo and now I see all the Spark/Scala stuff. Not sure what you mean by "standalone in the cluster"? Just getting up to speed on Spark and they describe integrating with hadoop. I was asking because I need to set up a clustered environment and really only have one cluster so Spark and Hadoop will coexist on the the same machines. I'll probably stay on Hadoop 1.2.1 as Dimitriy suggests. So you plan to add to D's PDF doc? If so maybe it would be good to check it in or add it to the Github wiki or cwiki. Even if it's only a link to the PDF we'd all know where to look for the latest.
        Hide
        ssc Sebastian Schelter added a comment -

        Luckily, Dmitriy's latest commit solved most of my problems as it provides the machinery to compute column sums and broadcast drms. Awesome work, Dmitriy.

        Here is a first hacky version of cooccurrence analysis with downsampling and LLR using Dmitriy's new DSL.

        Successfully tested on the movielens1M dataset on my laptop

        Show
        ssc Sebastian Schelter added a comment - Luckily, Dmitriy's latest commit solved most of my problems as it provides the machinery to compute column sums and broadcast drms. Awesome work, Dmitriy. Here is a first hacky version of cooccurrence analysis with downsampling and LLR using Dmitriy's new DSL. Successfully tested on the movielens1M dataset on my laptop
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        What's the best way to share PDF source? i can put it on the site so committers can re-generate it. otherwise, its source right now in my github doc branch here and pull request is definitely possible way to collaborate too: https://github.com/dlyubimov/mahout-commits/tree/ssvd-docs

        Show
        dlyubimov Dmitriy Lyubimov added a comment - What's the best way to share PDF source? i can put it on the site so committers can re-generate it. otherwise, its source right now in my github doc branch here and pull request is definitely possible way to collaborate too: https://github.com/dlyubimov/mahout-commits/tree/ssvd-docs
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        1.

        val C = A.t %*% A
        

        I don't remember if i actually put in the physical operator for non-skinny A. There are two distinct algorithms to deal with it. Skinny one (n <= 5000 or something) uses upper-triangular vector-backed accumulator to combine stuff right in map. Of course if accumulator does not realistically fit in memory then another algorithm has to be plugged in for A-squared. See AtA.scala, def at_a_nongraph(). It currently throws UnsupportedOperation (but everything i have done so far only uses slim A'A)

        2. when using partial functions with mapBlock, you actually do not have to use (

        {...}

        ) but just { }:

              drmBt = drmBt.mapBlock() {
                case (keys, block) =>
        //...
                  keys -> block
              }
        
        Show
        dlyubimov Dmitriy Lyubimov added a comment - 1. val C = A.t %*% A I don't remember if i actually put in the physical operator for non-skinny A. There are two distinct algorithms to deal with it. Skinny one (n <= 5000 or something) uses upper-triangular vector-backed accumulator to combine stuff right in map. Of course if accumulator does not realistically fit in memory then another algorithm has to be plugged in for A-squared. See AtA.scala, def at_a_nongraph(). It currently throws UnsupportedOperation (but everything i have done so far only uses slim A'A) 2. when using partial functions with mapBlock, you actually do not have to use ( {...} ) but just { }: drmBt = drmBt.mapBlock() { case (keys, block) => //... keys -> block }
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Also, just FYI, much as i love to use single-letter capitals for matrices, it turns out Scala is not really ingesting it in all situtations. For example,

        val (U, V, s) = ssvd(...)
        

        doesn't compile.

        So i ended up using, perhaps verbosely, drmA and inCoreA notations.
        Perhaps we can agree on what's reasonable .

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Also, just FYI, much as i love to use single-letter capitals for matrices, it turns out Scala is not really ingesting it in all situtations. For example, val (U, V, s) = ssvd(...) doesn't compile. So i ended up using, perhaps verbosely, drmA and inCoreA notations. Perhaps we can agree on what's reasonable .
        Hide
        ssc Sebastian Schelter added a comment -

        Updated patch to match the coding conventions and use RLikeOps where possible

        Show
        ssc Sebastian Schelter added a comment - Updated patch to match the coding conventions and use RLikeOps where possible
        Hide
        ssc Sebastian Schelter added a comment -

        The physical operator for non-skinny A'A is not yet implemented. For my tests with movielens1M, I forced the execution of the in-memory one by setting mahout.math.AtA.maxInMemNCol to an appropriate value.

        Show
        ssc Sebastian Schelter added a comment - The physical operator for non-skinny A'A is not yet implemented. For my tests with movielens1M, I forced the execution of the in-memory one by setting mahout.math.AtA.maxInMemNCol to an appropriate value.
        Hide
        pferrel Pat Ferrel added a comment -

        PDF in the repo is fine by me.

        Can the patches just be branches in a git repo? That's really what a branch is after all. When I make changes to Mahout I fork it, make changes in a branch with apache/mahout as the upstream repo (you guys wouldn't even need to fork Mahout, just stay in the branch). That would make it super easy for everyone to follow changes to the "patch" by just pulling the latest from your branch.

        With any luck these may be the FIRST Mahout jobs people use in a few years so I wouldn't assume they are already familiar with the in-memory code, hadoop jobs, the math literature, or R for that matter. Just saying that you may want consider the future audience.

        Show
        pferrel Pat Ferrel added a comment - PDF in the repo is fine by me. Can the patches just be branches in a git repo? That's really what a branch is after all. When I make changes to Mahout I fork it, make changes in a branch with apache/mahout as the upstream repo (you guys wouldn't even need to fork Mahout, just stay in the branch). That would make it super easy for everyone to follow changes to the "patch" by just pulling the latest from your branch. With any luck these may be the FIRST Mahout jobs people use in a few years so I wouldn't assume they are already familiar with the in-memory code, hadoop jobs, the math literature, or R for that matter. Just saying that you may want consider the future audience.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        That's what i normally do, yes. The scalabindings issue points to a branch in github. Then there's commit-squash method (described in my blog) i do when pushing to svn. Hopefully we'd see direct git pushes for mahout sooner rather than later.
        However, seeing a combined ("squashed") patch is pretty useful too as apposed to tons of indivdiual commits.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - That's what i normally do, yes. The scalabindings issue points to a branch in github. Then there's commit-squash method (described in my blog) i do when pushing to svn. Hopefully we'd see direct git pushes for mahout sooner rather than later. However, seeing a combined ("squashed") patch is pretty useful too as apposed to tons of indivdiual commits.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment - - edited

        @Sebastian Schelter Looking nice.

        I guess we want non-skinny version of operator A'A still, i may be able to look into it.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - - edited @ Sebastian Schelter Looking nice. I guess we want non-skinny version of operator A'A still, i may be able to look into it.
        Hide
        pferrel Pat Ferrel added a comment -

        Since there are potentially commits by D and S around Spark, what's the best way to track? I assume only RSJ is an issue since the rest will go to the trunk if changed?

        Sebastian, do you plan to use git or update patches on this issue?

        Dimitriy, can you send me a link to your blog post. I assume you use something like git diff to do the squashed patch, yes very helpful.

        Show
        pferrel Pat Ferrel added a comment - Since there are potentially commits by D and S around Spark, what's the best way to track? I assume only RSJ is an issue since the rest will go to the trunk if changed? Sebastian, do you plan to use git or update patches on this issue? Dimitriy, can you send me a link to your blog post. I assume you use something like git diff to do the squashed patch, yes very helpful.
        Show
        dlyubimov Dmitriy Lyubimov added a comment - http://weatheringthrutechdays.blogspot.com/2011/04/git-github-and-committing-to-asf-svn.html
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Actually, non-slim A'A operator is practically A'B without need for a zip... So we are almost done, the biggest work here is the test I suppose.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Actually, non-slim A'A operator is practically A'B without need for a zip... So we are almost done, the biggest work here is the test I suppose.
        Hide
        ssc Sebastian Schelter added a comment - - edited

        replaced

        matrix.viewRow(i)

        calls with nicer looking

        matrix(i,::)

        access

        Show
        ssc Sebastian Schelter added a comment - - edited replaced matrix.viewRow(i) calls with nicer looking matrix(i,::) access
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        yeah .. those views.. i think they create at least 2 objects interim... not so cool for mass iterations. Oh well.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - yeah .. those views.. i think they create at least 2 objects interim... not so cool for mass iterations. Oh well.
        Hide
        ssc Sebastian Schelter added a comment -

        In a SparseRowMatrix, this is only an array access, can we intelligently use a SparseRowMatrix instead of SparseMatrix in such cases?

        Show
        ssc Sebastian Schelter added a comment - In a SparseRowMatrix, this is only an array access, can we intelligently use a SparseRowMatrix instead of SparseMatrix in such cases?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        I have non-slim A'A. Of course slim operator implementation is upper triangular that cuts outer product computation cost two times in comparison... Significantly wide A'A on the other hand cannot really apply the same cut, since it needs to form rows in distributed way.

        Not surprisingly, slim test takes 17 seconds and the "fat" one takes 21 seconds on my fairly ancient computer for squaring 400x550 matrix (single thread). Actually, i expected a little more significant gap.

        I wonder if there's a more interesting way to do this other than forming outer product vertical blocks.

        Maybe I need to use square blocks. In this case i can reuse roughly half of them – but then there will be significantly more objects with this (albeit smaller in size). and then i will have to have an extra shuffle operation to form the lower triangular part of the matrix still.

        Anyway. i think i will commit what i have.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - I have non-slim A'A. Of course slim operator implementation is upper triangular that cuts outer product computation cost two times in comparison... Significantly wide A'A on the other hand cannot really apply the same cut, since it needs to form rows in distributed way. Not surprisingly, slim test takes 17 seconds and the "fat" one takes 21 seconds on my fairly ancient computer for squaring 400x550 matrix (single thread). Actually, i expected a little more significant gap. I wonder if there's a more interesting way to do this other than forming outer product vertical blocks. Maybe I need to use square blocks. In this case i can reuse roughly half of them – but then there will be significantly more objects with this (albeit smaller in size). and then i will have to have an extra shuffle operation to form the lower triangular part of the matrix still. Anyway. i think i will commit what i have.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        On Thu, Mar 20, 2014 at 1:42 AM, Sebastian Schelter (JIRA)

        For that very reason, i almost always use SRM and almost never SM.

        What i really would probably love is a sparse row and column block (hash
        hanging from hash), this seems like recurring issue in blocking
        calculations such as ALS. SRM does always that, except it uses full size
        vector to hang sprase vectors off.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - On Thu, Mar 20, 2014 at 1:42 AM, Sebastian Schelter (JIRA) For that very reason, i almost always use SRM and almost never SM. What i really would probably love is a sparse row and column block (hash hanging from hash), this seems like recurring issue in blocking calculations such as ALS. SRM does always that, except it uses full size vector to hang sprase vectors off.
        Hide
        ssc Sebastian Schelter added a comment -

        one possibility would be to allow users to give a "hint" to mapBlock operations, so that the underlying blockify call can choose an appropriate representation. Would that make sense?

        Show
        ssc Sebastian Schelter added a comment - one possibility would be to allow users to give a "hint" to mapBlock operations, so that the underlying blockify call can choose an appropriate representation. Would that make sense?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        No, i think blockify is fine. it probably can run a bit faster than it does, but oh well.

        And mapblock doesn't trigger it (or, rather, it is evaluated lazily; and if previous operator already produced blocks, then blockify is not used). what i was saying is along the lines of A'A computation. There's a structure that is used to fuse operators, which is sort of "eitherOr" of either DrmRdd or BlockifiedDrmRdd type. I can to conclusion that there are operators that are absolute pain to implement on blocks, and there are that would be pain to implement on row vector bags. But blocks can be presented as row bags via viewing, so conversion to blocks happens only if subsequent operator requires it. What's more, usually block operator outputs blocks as well and vice versa, so realistically blockify happens not so often at all.

        Another caveat is that one has to be careful with map blocks with side effects on RDD of origin. Even though Spark says all RDDs are immutable, side effects will stay visible to parent RDDs if they are cached as MEMORY_ONLY or MEMORY_AND_DISK (i.e. without mandatory clone-via-serialization in block manager) and then subsequently used as a source again.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - No, i think blockify is fine. it probably can run a bit faster than it does, but oh well. And mapblock doesn't trigger it (or, rather, it is evaluated lazily; and if previous operator already produced blocks, then blockify is not used). what i was saying is along the lines of A'A computation. There's a structure that is used to fuse operators, which is sort of "eitherOr" of either DrmRdd or BlockifiedDrmRdd type. I can to conclusion that there are operators that are absolute pain to implement on blocks, and there are that would be pain to implement on row vector bags. But blocks can be presented as row bags via viewing, so conversion to blocks happens only if subsequent operator requires it. What's more, usually block operator outputs blocks as well and vice versa, so realistically blockify happens not so often at all. Another caveat is that one has to be careful with map blocks with side effects on RDD of origin. Even though Spark says all RDDs are immutable, side effects will stay visible to parent RDDs if they are cached as MEMORY_ONLY or MEMORY_AND_DISK (i.e. without mandatory clone-via-serialization in block manager) and then subsequently used as a source again.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Oh, you mean in case of sparse row vectors.
        You are probably right. indeed, there's currently a SparseMatrix there in this case. I think it should be SparseRowMatrix of course. most of the cases should benefit from it. Problem is, like i said, mapblock doesn't really form it; nor any other physical operator has any knowledge what formed it.

        It is possible to optimize the entire operator fusion chain based on subsequent operator preferred type, that's actually a very neat idea for in-core speed optimization; but i have no capacity to pursue this technique at the moment. It needs some digestion anyway (at least on my end). It requires experiments with in-core operations. At the first glance, most non-multiplicative operators would be ok with row-wise matrix, as well as deblockifying views.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Oh, you mean in case of sparse row vectors. You are probably right. indeed, there's currently a SparseMatrix there in this case. I think it should be SparseRowMatrix of course. most of the cases should benefit from it. Problem is, like i said, mapblock doesn't really form it; nor any other physical operator has any knowledge what formed it. It is possible to optimize the entire operator fusion chain based on subsequent operator preferred type, that's actually a very neat idea for in-core speed optimization; but i have no capacity to pursue this technique at the moment. It needs some digestion anyway (at least on my end). It requires experiments with in-core operations. At the first glance, most non-multiplicative operators would be ok with row-wise matrix, as well as deblockifying views.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        If anything, at least i see non-negligible speed up in the blockification itself it seems once i use row matrix. I think i will commit that.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - If anything, at least i see non-negligible speed up in the blockification itself it seems once i use row matrix. I think i will commit that.
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in Mahout-Quality #2530 (See https://builds.apache.org/job/Mahout-Quality/2530/)
        MAHOUT-1464 'Fat' nongraph physical A'A.
        Refactored decompositions package out of drm package.
        Added some kryo and akka related properties to the test setup. (dlyubimov: rev 1579565)

        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DQR.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSPCA.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSSVD.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala
        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
        • /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions
        • /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala
        • /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
        • /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/decompositions/MathSuite.scala
        • /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in Mahout-Quality #2530 (See https://builds.apache.org/job/Mahout-Quality/2530/ ) MAHOUT-1464 'Fat' nongraph physical A'A. Refactored decompositions package out of drm package. Added some kryo and akka related properties to the test setup. (dlyubimov: rev 1579565) /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DQR.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSPCA.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/decompositions/DSSVD.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/io/WritableKryoSerializer.scala /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/decompositions/MathSuite.scala /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/decompositions/MathSuite.scala /mahout/trunk/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in Mahout-Quality #2531 (See https://builds.apache.org/job/Mahout-Quality/2531/)
        MAHOUT-1464 using RowMatrix with blockify() (dlyubimov: rev 1579574)

        • /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in Mahout-Quality #2531 (See https://builds.apache.org/job/Mahout-Quality/2531/ ) MAHOUT-1464 using RowMatrix with blockify() (dlyubimov: rev 1579574) /mahout/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
        Hide
        kanjilal Saikat Kanjilal added a comment -

        Sebastien,
        Can I help out on this issue or are you actively planning to fix on your own?

        Show
        kanjilal Saikat Kanjilal added a comment - Sebastien, Can I help out on this issue or are you actively planning to fix on your own?
        Hide
        ssc Sebastian Schelter added a comment - - edited

        Would be awesome if you could take the patch and test it on a large dataset on a cluster. So far I have only locally tested it.

        Show
        ssc Sebastian Schelter added a comment - - edited Would be awesome if you could take the patch and test it on a large dataset on a cluster. So far I have only locally tested it.
        Hide
        kanjilal Saikat Kanjilal added a comment -

        We have a cluster at work , however I'm not sure if I can actually use it, do we have an AWS account for mahout that I can use, I think this might be a good opportunity for us to get one for mahout.

        Show
        kanjilal Saikat Kanjilal added a comment - We have a cluster at work , however I'm not sure if I can actually use it, do we have an AWS account for mahout that I can use, I think this might be a good opportunity for us to get one for mahout.
        Hide
        pferrel Pat Ferrel added a comment -

        Adding 16 cores to my closet's cluster next week. Is there a 'large' dataset you have in mind? I have one with 4000 rows, 75,000 columns and 700,000 values but that seems smallish. Can't say when I'll get to it but it's on my list. If someone can jump in quicker--have at it.

        Show
        pferrel Pat Ferrel added a comment - Adding 16 cores to my closet's cluster next week. Is there a 'large' dataset you have in mind? I have one with 4000 rows, 75,000 columns and 700,000 values but that seems smallish. Can't say when I'll get to it but it's on my list. If someone can jump in quicker--have at it.
        Hide
        kanjilal Saikat Kanjilal added a comment -

        +1 on Andrew's suggestion on using AWS to do this. Andrew is it possible to have a shared account so mahout contributors can use this, I 'd even be willing to chip in donations to have a shared AWS account

        Show
        kanjilal Saikat Kanjilal added a comment - +1 on Andrew's suggestion on using AWS to do this. Andrew is it possible to have a shared account so mahout contributors can use this, I 'd even be willing to chip in donations to have a shared AWS account
        Hide
        andrew.musselman Andrew Musselman added a comment -

        Saikat Kanjilal Not sure how to make a shared account work but do you guys have a scripted test sequence you could share? I'd be happy to spin up an EMR instance and try it out.

        Show
        andrew.musselman Andrew Musselman added a comment - Saikat Kanjilal Not sure how to make a shared account work but do you guys have a scripted test sequence you could share? I'd be happy to spin up an EMR instance and try it out.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment - - edited

        Adding 16 cores to my closet's cluster next week. Is there a 'large' dataset you have in mind? I have one with 4000 rows, 75,000 columns and 700,000 values but that seems smallish. Can't say when I'll get to it but it's on my list. If someone can jump in quicker--have at it.

        @Sebastian, actually matrix squaring is incredibly expensive – size ^1.5 for the flops alone. Did your original version also used matrix squaring? How did it fare?

        Also, since the flops grow power-law w.r.t input size (it is a problem for ssvd, too) we may need to contemplate a technique that creates finer splits for such computations based on input size. It very well may be the case that original hdfs splits may turn out to be too large for adequate load redistribution.

        Technically, it is extremely simple – we'd just have to insert a physical operator tweaking RDD splits via "shuffless" coalesce() which also costs nothing in Spark. However, i am not sure what would be sensible API for this – automatic, semi-automatic cost-based...

        I guess one brainless thing to do is to parameterize optimizer context with desired parallelism (~cluster task capacity) and have optimizer to insert physical opertors that very # of partitions and do automatic shuffless coalesce if the number is too low

        any thoughts?

        Show
        dlyubimov Dmitriy Lyubimov added a comment - - edited Adding 16 cores to my closet's cluster next week. Is there a 'large' dataset you have in mind? I have one with 4000 rows, 75,000 columns and 700,000 values but that seems smallish. Can't say when I'll get to it but it's on my list. If someone can jump in quicker--have at it. @Sebastian, actually matrix squaring is incredibly expensive – size ^1.5 for the flops alone. Did your original version also used matrix squaring? How did it fare? Also, since the flops grow power-law w.r.t input size (it is a problem for ssvd, too) we may need to contemplate a technique that creates finer splits for such computations based on input size. It very well may be the case that original hdfs splits may turn out to be too large for adequate load redistribution. Technically, it is extremely simple – we'd just have to insert a physical operator tweaking RDD splits via "shuffless" coalesce() which also costs nothing in Spark. However, i am not sure what would be sensible API for this – automatic, semi-automatic cost-based... I guess one brainless thing to do is to parameterize optimizer context with desired parallelism (~cluster task capacity) and have optimizer to insert physical opertors that very # of partitions and do automatic shuffless coalesce if the number is too low any thoughts?
        Hide
        ssc Sebastian Schelter added a comment -

        Pat Ferrel I planned to test the implementation on the R2 dataset from Yahoo Webscope [1] with 700M interactions. I regularly use that in the papers I write. You have to do a little paperwork with Yahoo to get access however.

        Dmitriy Lyubimov The original implementation also uses matrix squaring. The input matrix is row-partitioned and mappers compute outer products of rows, emitting the resulting matrices row wise and reducers sum those up. The downsampling which is applied beforehand helps with cutting down the costs of that multiplicatio

        [1] http://webscope.sandbox.yahoo.com/catalog.php?datatype=r

        Show
        ssc Sebastian Schelter added a comment - Pat Ferrel I planned to test the implementation on the R2 dataset from Yahoo Webscope [1] with 700M interactions. I regularly use that in the papers I write. You have to do a little paperwork with Yahoo to get access however. Dmitriy Lyubimov The original implementation also uses matrix squaring. The input matrix is row-partitioned and mappers compute outer products of rows, emitting the resulting matrices row wise and reducers sum those up. The downsampling which is applied beforehand helps with cutting down the costs of that multiplicatio [1] http://webscope.sandbox.yahoo.com/catalog.php?datatype=r
        Hide
        pferrel Pat Ferrel added a comment -

        I tried by you have to have a .edu or university email address. If the license allows you can always put it where you put the last data set. In the meantime I'll see if I still have an alum address.

        Show
        pferrel Pat Ferrel added a comment - I tried by you have to have a .edu or university email address. If the license allows you can always put it where you put the last data set. In the meantime I'll see if I still have an alum address.
        Hide
        pferrel Pat Ferrel added a comment -

        OK, I do have an alum address but it takes some time to set up.

        Show
        pferrel Pat Ferrel added a comment - OK, I do have an alum address but it takes some time to set up.
        Hide
        ssc Sebastian Schelter added a comment -

        Updated patch that now also contains cross-co-occurrence analysis. Includes an example using the epinions dataset which consists of user-item-ratings and a user-user trust network.

        Has only been tested locally at this point.

        Show
        ssc Sebastian Schelter added a comment - Updated patch that now also contains cross-co-occurrence analysis. Includes an example using the epinions dataset which consists of user-item-ratings and a user-user trust network. Has only been tested locally at this point.
        Show
        pferrel Pat Ferrel added a comment - These? http://konect.uni-koblenz.de/networks/epinions-rating http://konect.uni-koblenz.de/networks/epinions
        Hide
        ssc Sebastian Schelter added a comment -

        I used those files here:

        http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2
        http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2

        RunCrossCooccurrenceAnalysisOnEpinions in org.apache.mahout.cf.examples.Recommendations.scala shows how to run cross-co-occurrence on these files (code in the patch).

        Show
        ssc Sebastian Schelter added a comment - I used those files here: http://www.trustlet.org/datasets/downloaded_epinions/ratings_data.txt.bz2 http://www.trustlet.org/datasets/downloaded_epinions/trust_data.txt.bz2 RunCrossCooccurrenceAnalysisOnEpinions in org.apache.mahout.cf.examples.Recommendations.scala shows how to run cross-co-occurrence on these files (code in the patch).
        Hide
        tdunning Ted Dunning added a comment -

        Is there any strong reason to use the similarity form for LLR here? It obscures what is going one and the only real reason is to invert the sort order and match code that isn't being used here.

        Show
        tdunning Ted Dunning added a comment - Is there any strong reason to use the similarity form for LLR here? It obscures what is going one and the only real reason is to invert the sort order and match code that isn't being used here.
        Hide
        ssc Sebastian Schelter added a comment -

        Ted Dunning good catch, the similarity form is just an artifact from the old codebase, removed it and updated the patch

        Show
        ssc Sebastian Schelter added a comment - Ted Dunning good catch, the similarity form is just an artifact from the old codebase, removed it and updated the patch
        Hide
        ssc Sebastian Schelter added a comment -

        Updated patch. Removed the similarity form of LLR, removed math specific code that was adressed in MAHOUT-1508, added nicer output and did a few cosmetic changes.

        I think this code is ready to be tested on a cluster, does anybody have time for that?

        Show
        ssc Sebastian Schelter added a comment - Updated patch. Removed the similarity form of LLR, removed math specific code that was adressed in MAHOUT-1508 , added nicer output and did a few cosmetic changes. I think this code is ready to be tested on a cluster, does anybody have time for that?
        Hide
        pferrel Pat Ferrel added a comment -

        I have some time this week so working on it.

        Show
        pferrel Pat Ferrel added a comment - I have some time this week so working on it.
        Hide
        pferrel Pat Ferrel added a comment - - edited

        OK, I have a cluster set up but first tried locally on my laptop. I installed the latest Spark 0.9.1 (not 0.9.0 called for in the pom assuming this is OK), which uses Scala 2.10. BTW the object RunCrossCooccurrenceAnalysisOnEpinions has an incorrect comment println about usage--wrong object name. I never get the printlns, I assume because I'm not launching from the Spark shell???

        println("Usage: RunCooccurrenceAnalysisOnMovielens1M <path-to-dataset-folder>")

        This leads me to believe that you launch from the Spark Scala shell?? Anyway I tried the method called out in the Spark docs for CLI execution shown below and execute RunCrossCooccurrenceAnalysisOnEpinions via a bash script. Not sure where to look for output. The code says:

        RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0),
        "/tmp/co-occurrence-on-epinions/indicators-item-item/")
        RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1),
        "/tmp/co-occurrence-on-epinions/indicators-trust-item/")

        Assume this in localfs since the data came from there? I see the Spark pids there but no temp data.

        Here's how I ran it.

        Put data in localfs:
        Maclaurin:mahout pat$ ls -al ~/hdfs-mirror/xrsj/
        total 29320
        drwxr-xr-x 4 pat staff 136 Apr 14 09:01 .
        drwxr-xr-x 10 pat staff 340 Apr 14 09:00 ..
        rw-rr- 1 pat staff 8650128 Apr 14 09:01 ratings_data.txt
        rw-rr- 1 pat staff 6357397 Apr 14 09:01 trust_data.txt

        Start up Spark on localhost, webUI says all is well.

        Run the xrsj on local data via shell script attached.

        The driver runs and creates a worker, which runs for quite awhile but the log says there was an ERROR.

        Maclaurin:mahout pat$ cat /Users/pat/spark-0.9.1-bin-hadoop1/sbin/../logs/spark-pat-org.apache.spark.deploy.worker.Worker-1-
        spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out.2
        spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out.1 spark-pat-org.apache.spark.deploy.worker.Worker-1-occam4.out
        Maclaurin:mahout pat$ cat /Users/pat/spark-0.9.1-bin-hadoop1/sbin/../logs/spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out
        Spark Command: /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/bin/java -cp :/Users/pat/spark-0.9.1-bin-hadoop1/conf:/Users/pat/spark-0.9.1-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop1.0.4.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://Maclaurin.local:7077
        ========================================

        log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger).
        log4j:WARN Please initialize the log4j system properly.
        log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
        14/04/14 09:26:00 INFO Worker: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
        14/04/14 09:26:00 INFO Worker: Starting Spark worker 192.168.0.2:52068 with 8 cores, 15.0 GB RAM
        14/04/14 09:26:00 INFO Worker: Spark home: /Users/pat/spark-0.9.1-bin-hadoop1
        14/04/14 09:26:00 INFO WorkerWebUI: Started Worker web UI at http://192.168.0.2:8081
        14/04/14 09:26:00 INFO Worker: Connecting to master spark://Maclaurin.local:7077...
        14/04/14 09:26:00 INFO Worker: Successfully registered with master spark://Maclaurin.local:7077
        14/04/14 09:26:19 INFO Worker: Asked to launch driver driver-20140414092619-0000
        2014-04-14 09:26:19.947 java[53509:9407] Unable to load realm info from SCDynamicStore
        14/04/14 09:26:20 INFO DriverRunner: Copying user jar file:/Users/pat/mahout/spark/target/mahout-spark-1.0-SNAPSHOT.jar to /Users/pat/spark-0.9.1-bin-hadoop1/work/driver-20140414092619-0000/mahout-spark-1.0-SNAPSHOT.jar
        14/04/14 09:26:20 INFO DriverRunner: Launch Command: "/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/bin/java" "-cp" ":/Users/pat/spark-0.9.1-bin-hadoop1/work/driver-20140414092619-0000/mahout-spark-1.0-SNAPSHOT.jar:/Users/pat/spark-0.9.1-bin-hadoop1/conf:/Users/pat/spark-0.9.1-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop1.0.4.jar:/usr/local/hadoop/conf" "-Xms512M" "-Xmx512M" "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://sparkWorker@192.168.0.2:52068/user/Worker" "RunCrossCooccurrenceAnalysisOnEpinions" "file://Users/pat/hdfs-mirror/xrsj"
        14/04/14 09:26:21 ERROR OneForOneStrategy: FAILED (of class scala.Enumeration$Val)
        scala.MatchError: FAILED (of class scala.Enumeration$Val)
        at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:277)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        14/04/14 09:26:21 INFO Worker: Starting Spark worker 192.168.0.2:52068 with 8 cores, 15.0 GB RAM
        14/04/14 09:26:21 INFO Worker: Spark home: /Users/pat/spark-0.9.1-bin-hadoop1
        14/04/14 09:26:21 INFO WorkerWebUI: Started Worker web UI at http://192.168.0.2:8081
        14/04/14 09:26:21 INFO Worker: Connecting to master spark://Maclaurin.local:7077...
        14/04/14 09:26:21 INFO Worker: Successfully registered with master spark://Maclaurin.local:7077

        Show
        pferrel Pat Ferrel added a comment - - edited OK, I have a cluster set up but first tried locally on my laptop. I installed the latest Spark 0.9.1 (not 0.9.0 called for in the pom assuming this is OK), which uses Scala 2.10. BTW the object RunCrossCooccurrenceAnalysisOnEpinions has an incorrect comment println about usage--wrong object name. I never get the printlns, I assume because I'm not launching from the Spark shell??? println("Usage: RunCooccurrenceAnalysisOnMovielens1M <path-to-dataset-folder>") This leads me to believe that you launch from the Spark Scala shell?? Anyway I tried the method called out in the Spark docs for CLI execution shown below and execute RunCrossCooccurrenceAnalysisOnEpinions via a bash script. Not sure where to look for output. The code says: RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), "/tmp/co-occurrence-on-epinions/indicators-item-item/") RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(1), "/tmp/co-occurrence-on-epinions/indicators-trust-item/") Assume this in localfs since the data came from there? I see the Spark pids there but no temp data. Here's how I ran it. Put data in localfs: Maclaurin:mahout pat$ ls -al ~/hdfs-mirror/xrsj/ total 29320 drwxr-xr-x 4 pat staff 136 Apr 14 09:01 . drwxr-xr-x 10 pat staff 340 Apr 14 09:00 .. rw-r r - 1 pat staff 8650128 Apr 14 09:01 ratings_data.txt rw-r r - 1 pat staff 6357397 Apr 14 09:01 trust_data.txt Start up Spark on localhost, webUI says all is well. Run the xrsj on local data via shell script attached. The driver runs and creates a worker, which runs for quite awhile but the log says there was an ERROR. Maclaurin:mahout pat$ cat /Users/pat/spark-0.9.1-bin-hadoop1/sbin/../logs/spark-pat-org.apache.spark.deploy.worker.Worker-1- spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out.2 spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out.1 spark-pat-org.apache.spark.deploy.worker.Worker-1-occam4.out Maclaurin:mahout pat$ cat /Users/pat/spark-0.9.1-bin-hadoop1/sbin/../logs/spark-pat-org.apache.spark.deploy.worker.Worker-1-Maclaurin.local.out Spark Command: /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/bin/java -cp :/Users/pat/spark-0.9.1-bin-hadoop1/conf:/Users/pat/spark-0.9.1-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop1.0.4.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://Maclaurin.local:7077 ======================================== log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/04/14 09:26:00 INFO Worker: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/04/14 09:26:00 INFO Worker: Starting Spark worker 192.168.0.2:52068 with 8 cores, 15.0 GB RAM 14/04/14 09:26:00 INFO Worker: Spark home: /Users/pat/spark-0.9.1-bin-hadoop1 14/04/14 09:26:00 INFO WorkerWebUI: Started Worker web UI at http://192.168.0.2:8081 14/04/14 09:26:00 INFO Worker: Connecting to master spark://Maclaurin.local:7077... 14/04/14 09:26:00 INFO Worker: Successfully registered with master spark://Maclaurin.local:7077 14/04/14 09:26:19 INFO Worker: Asked to launch driver driver-20140414092619-0000 2014-04-14 09:26:19.947 java [53509:9407] Unable to load realm info from SCDynamicStore 14/04/14 09:26:20 INFO DriverRunner: Copying user jar file:/Users/pat/mahout/spark/target/mahout-spark-1.0-SNAPSHOT.jar to /Users/pat/spark-0.9.1-bin-hadoop1/work/driver-20140414092619-0000/mahout-spark-1.0-SNAPSHOT.jar 14/04/14 09:26:20 INFO DriverRunner: Launch Command: "/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/bin/java" "-cp" ":/Users/pat/spark-0.9.1-bin-hadoop1/work/driver-20140414092619-0000/mahout-spark-1.0-SNAPSHOT.jar:/Users/pat/spark-0.9.1-bin-hadoop1/conf:/Users/pat/spark-0.9.1-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop1.0.4.jar:/usr/local/hadoop/conf" "-Xms512M" "-Xmx512M" "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://sparkWorker@192.168.0.2:52068/user/Worker" "RunCrossCooccurrenceAnalysisOnEpinions" "file://Users/pat/hdfs-mirror/xrsj" 14/04/14 09:26:21 ERROR OneForOneStrategy: FAILED (of class scala.Enumeration$Val) scala.MatchError: FAILED (of class scala.Enumeration$Val) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:277) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/04/14 09:26:21 INFO Worker: Starting Spark worker 192.168.0.2:52068 with 8 cores, 15.0 GB RAM 14/04/14 09:26:21 INFO Worker: Spark home: /Users/pat/spark-0.9.1-bin-hadoop1 14/04/14 09:26:21 INFO WorkerWebUI: Started Worker web UI at http://192.168.0.2:8081 14/04/14 09:26:21 INFO Worker: Connecting to master spark://Maclaurin.local:7077... 14/04/14 09:26:21 INFO Worker: Successfully registered with master spark://Maclaurin.local:7077
        Hide
        pferrel Pat Ferrel added a comment -

        wow, that really screwed up the shell script so I've attached it.

        Show
        pferrel Pat Ferrel added a comment - wow, that really screwed up the shell script so I've attached it.
        Hide
        pferrel Pat Ferrel added a comment -

        script used to execute cross-similarity code on locahost Spark and local filesystem.

        Show
        pferrel Pat Ferrel added a comment - script used to execute cross-similarity code on locahost Spark and local filesystem.
        Hide
        ssc Sebastian Schelter added a comment -

        Currently, the RunCooccurrenceAnalysisOnMovielens1M script only sets up a local spark context and reads and writes from the local fs. Sorry for not mentioning this upfront. Do you want to try to change it yourself or should I update the patch?

        Show
        ssc Sebastian Schelter added a comment - Currently, the RunCooccurrenceAnalysisOnMovielens1M script only sets up a local spark context and reads and writes from the local fs. Sorry for not mentioning this upfront. Do you want to try to change it yourself or should I update the patch?
        Hide
        pferrel Pat Ferrel added a comment -

        I am running it locally, if by that you mean stand alone on localhost and actually running RunCrossCooccurrenceAnalysisOnEpinions

        Show
        pferrel Pat Ferrel added a comment - I am running it locally, if by that you mean stand alone on localhost and actually running RunCrossCooccurrenceAnalysisOnEpinions
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Running using Spark Client (inside the cluster) is a new thing in 0.9. Assuming it is stable, it is not supported at this point and going this way will have multiple hurdles.

        for one, mahout spark context requires MAHOUT_HOME to set all mahout binaries properly. The assumption is one needs Mahout's binaries only on driver's side, but if driver runs inside remote cluster, this will fail. So our batches should really be started in one of the ways i described in earlier email.

        Second, i don't think driver can load classes reliably because it includes Mahout dependencies such as mahout-math. That's another reason why using Client seems problematic to me – it assumes one has his entire application within that jar. So not true.

        That said, your attempt doesn't exhibit any direct ClassNotFounds and looks more like akka communication issues i.e. spark setup issues. One thing about Spark is that requires direct port connectivity not only between cluster nodes but also back to client. In particular it means your client must not firewall incoming calls and must not be behind NAT. (even port forwarding doesn't really solve networking issues here). So my first bet would be on akka connectivity issues between cluster and back to client.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Running using Spark Client (inside the cluster) is a new thing in 0.9. Assuming it is stable, it is not supported at this point and going this way will have multiple hurdles. for one, mahout spark context requires MAHOUT_HOME to set all mahout binaries properly. The assumption is one needs Mahout's binaries only on driver's side, but if driver runs inside remote cluster, this will fail. So our batches should really be started in one of the ways i described in earlier email. Second, i don't think driver can load classes reliably because it includes Mahout dependencies such as mahout-math. That's another reason why using Client seems problematic to me – it assumes one has his entire application within that jar. So not true. That said, your attempt doesn't exhibit any direct ClassNotFounds and looks more like akka communication issues i.e. spark setup issues. One thing about Spark is that requires direct port connectivity not only between cluster nodes but also back to client. In particular it means your client must not firewall incoming calls and must not be behind NAT. (even port forwarding doesn't really solve networking issues here). So my first bet would be on akka connectivity issues between cluster and back to client.
        Hide
        pferrel Pat Ferrel added a comment -

        not sure what you are saying, local as in local filesystem and only localhost Spark instance? How are you launching RunCrossCooccurrenceAnalysisOnEpinions ?

        I should get the local working first, I see the context setup in the code and will worry about that after local works.

        Is there something wrong with the script?

        Show
        pferrel Pat Ferrel added a comment - not sure what you are saying, local as in local filesystem and only localhost Spark instance? How are you launching RunCrossCooccurrenceAnalysisOnEpinions ? I should get the local working first, I see the context setup in the code and will worry about that after local works. Is there something wrong with the script?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Pat Ferrel if you look inside the Sebastian's patch, you will find it is hardcoded to use "local" Spark master. The master you specify to Client only tells which cluster to ship code to, not which master for the application to use. Which is why i think this Client thing is a little bit raw idea. Either way, it will not work with Sebastian's app. Instead, I'd suggest you to run Sebastian script directly from IDEA as a first step, after hacking master url in this line

        implicit val sc = mahoutSparkContext(masterUrl = "local", appName = "MahoutLocalContext",
        +      customJars = Traversable.empty[String])
        

        or making the script to accept it from environment or app params.

        (the convention in Spark example programs is that they accept master url as the first parameter).

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Pat Ferrel if you look inside the Sebastian's patch, you will find it is hardcoded to use "local" Spark master. The master you specify to Client only tells which cluster to ship code to, not which master for the application to use. Which is why i think this Client thing is a little bit raw idea. Either way, it will not work with Sebastian's app. Instead, I'd suggest you to run Sebastian script directly from IDEA as a first step, after hacking master url in this line implicit val sc = mahoutSparkContext(masterUrl = "local" , appName = "MahoutLocalContext" , + customJars = Traversable.empty[ String ]) or making the script to accept it from environment or app params. (the convention in Spark example programs is that they accept master url as the first parameter).
        Hide
        pferrel Pat Ferrel added a comment -

        @Dmitriy, no clue what email you are talking about, you have written a lot lately. Where is it, on a Jira?

        I did my setup and tried launching with Hadoop and Mahout running locally (MAHOUT_LOCAL=true), One localhost instance of Spark, passing in the 'mvn package' mahout spark jar from the localfs and pointing at data on the localfs. This is per instructions of the Spark site. There is no firewall issue since it is always localhost talking to localhost.

        Anyway if I could find your "running mahout on spark" email it would probably explain what I'm doing wrong.

        You did see I was using Spark 0.9.1?

        Show
        pferrel Pat Ferrel added a comment - @Dmitriy, no clue what email you are talking about, you have written a lot lately. Where is it, on a Jira? I did my setup and tried launching with Hadoop and Mahout running locally (MAHOUT_LOCAL=true), One localhost instance of Spark, passing in the 'mvn package' mahout spark jar from the localfs and pointing at data on the localfs. This is per instructions of the Spark site. There is no firewall issue since it is always localhost talking to localhost. Anyway if I could find your "running mahout on spark" email it would probably explain what I'm doing wrong. You did see I was using Spark 0.9.1?
        Hide
        pferrel Pat Ferrel added a comment -

        ok, no spark_client launch, got it.

        A pointer to the email would help.

        Show
        pferrel Pat Ferrel added a comment - ok, no spark_client launch, got it. A pointer to the email would help.
        Hide
        pferrel Pat Ferrel added a comment - - edited

        OK runs fine in IDEA, now I need some pointers for how to launch on the cluster.

        Should I be able to do that from IDEA as well by changing the context?

        Show
        pferrel Pat Ferrel added a comment - - edited OK runs fine in IDEA, now I need some pointers for how to launch on the cluster. Should I be able to do that from IDEA as well by changing the context?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        yes, you should be able to both hack the conetxt and launch the driver successfully from IDEA regardless if you are running "local", "standalone/HA standalone" or "mesos/HA mesos" resource managers – as long as resource managers are up and running on your cluster.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - yes, you should be able to both hack the conetxt and launch the driver successfully from IDEA regardless if you are running "local", "standalone/HA standalone" or "mesos/HA mesos" resource managers – as long as resource managers are up and running on your cluster.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        PS it mode other than "local" it will be looking for MAHOUT_HOME or -Dmahout.home= ... to point to latest Mahout directory. this should have latest binaries including RSJ to run in backend. that's what it ships to Spark app. ( you don't need to recompile Mahout if all you changed was just context hack).

        Show
        dlyubimov Dmitriy Lyubimov added a comment - PS it mode other than "local" it will be looking for MAHOUT_HOME or -Dmahout.home= ... to point to latest Mahout directory. this should have latest binaries including RSJ to run in backend. that's what it ships to Spark app. ( you don't need to recompile Mahout if all you changed was just context hack).
        Hide
        pferrel Pat Ferrel added a comment -

        I'm running on the localhost spark://Maclaurin.local:7077 master now and getting out of heap errors. When I ran locally I just passed in -Xms8000 to the JVM and that was fine.

        Had to hack the mahoutSparkContext code, there doesn't seem to be a way to pass in or modify the conf? Notice the 4g

        conf.setAppName(appName).setMaster(masterUrl)
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
        .set("spark.executor.memory", "4g")

        This worked fine.

        My dev machine is not part of the cluster and cannot participate because the path to scripts like start-slave.sh is different on the cluster and dev machine (Mac vs Linux). If I try to launch on the dev machine but point to a cluster managed by another machine it eventually tries to look in IDEA's WORKING_DIRECTORY/_temporary for something that is not there--maybe on the Spark Master?

        I need a way to launch this outside IDEA on a cluster machine, why shouldn't the spark_client method work?

        Anyway I'll keep trying to work this out, so far local and 'pseudo-cluster' work.

        Show
        pferrel Pat Ferrel added a comment - I'm running on the localhost spark://Maclaurin.local:7077 master now and getting out of heap errors. When I ran locally I just passed in -Xms8000 to the JVM and that was fine. Had to hack the mahoutSparkContext code, there doesn't seem to be a way to pass in or modify the conf? Notice the 4g conf.setAppName(appName).setMaster(masterUrl) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator") .set("spark.executor.memory", "4g") This worked fine. My dev machine is not part of the cluster and cannot participate because the path to scripts like start-slave.sh is different on the cluster and dev machine (Mac vs Linux). If I try to launch on the dev machine but point to a cluster managed by another machine it eventually tries to look in IDEA's WORKING_DIRECTORY/_temporary for something that is not there--maybe on the Spark Master? I need a way to launch this outside IDEA on a cluster machine, why shouldn't the spark_client method work? Anyway I'll keep trying to work this out, so far local and 'pseudo-cluster' work.
        Hide
        ssc Sebastian Schelter added a comment -

        Do you run this with the movielens dataset? You shouldn't need that much memory for that.

        Show
        ssc Sebastian Schelter added a comment - Do you run this with the movielens dataset? You shouldn't need that much memory for that.
        Hide
        pferrel Pat Ferrel added a comment -

        Probably don't here either but there is 16g to 8g on all machines.

        Show
        pferrel Pat Ferrel added a comment - Probably don't here either but there is 16g to 8g on all machines.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        That's odd. Honestly I don't know and never encountered that. Maybe it is
        something the program itself does, not Spark? stacktrace or log with some
        sort of complaint would be helpful.

        I know that with Mesos supervision, SPARK_HOME must be the same on all
        nodes (driver including). But i think this is only specfic to mesos setup.
        Standalone back should be able to handle locations.

        I think i gave an explanation for this already. Mostly, because it assumes
        the jar is all it takes to run the program. but it takes entire mahout to
        run a distribution. And because it still doesnt pass master to the program.
        IMO there's no real advantage of doing this vs. running a standalone
        application (perhaps with exception when you are running from remote and
        slowly connected client and want to disconnect while task still running).

        Show
        dlyubimov Dmitriy Lyubimov added a comment - That's odd. Honestly I don't know and never encountered that. Maybe it is something the program itself does, not Spark? stacktrace or log with some sort of complaint would be helpful. I know that with Mesos supervision, SPARK_HOME must be the same on all nodes (driver including). But i think this is only specfic to mesos setup. Standalone back should be able to handle locations. I think i gave an explanation for this already. Mostly, because it assumes the jar is all it takes to run the program. but it takes entire mahout to run a distribution. And because it still doesnt pass master to the program. IMO there's no real advantage of doing this vs. running a standalone application (perhaps with exception when you are running from remote and slowly connected client and want to disconnect while task still running).
        Hide
        pferrel Pat Ferrel added a comment - - edited

        Getting input from: hdfs://occam4.local/user/pat/xrsj the job seems able to complete up to the point where it tries to write the output. Then running inside IDEA I am unable to connect to the cluster HDFS master to write.

        I've never been able to have code write to HDFS from inside IDEA. I just run it from a bash script where my dev machine is configured as an HDFS client.

        Shouldn't using 'spark-class org.apache.spark.deploy.Client launch' give us this?

        BTW all the computation is indeed running on the cluster.

        Show
        pferrel Pat Ferrel added a comment - - edited Getting input from: hdfs://occam4.local/user/pat/xrsj the job seems able to complete up to the point where it tries to write the output. Then running inside IDEA I am unable to connect to the cluster HDFS master to write. I've never been able to have code write to HDFS from inside IDEA. I just run it from a bash script where my dev machine is configured as an HDFS client. Shouldn't using 'spark-class org.apache.spark.deploy.Client launch' give us this? BTW all the computation is indeed running on the cluster.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        IDEA is driver. but output is written by spark workers. Not the same
        environment, and in most cases, not the same machine. Just like it happens
        for MR reducers. Unless it is "local" master url. Which i assume it was not.

        This is strange. I can, was able to and will able to. why wouldn't it able
        to? unless there are network or security issues. There's nothing
        fundamentally different between reading/writing hdfs from a worker process
        or any other process.

        No. Spark client is about shipping driver and have it running somewhere
        else. it is as if somebody was running mahout cli command on one of the
        worker nodes. this is it. it knows nothing about hdfs – and even what the
        driver program is going to do. One might use the Client code to print out
        "Hello, World" and exit on some of the worker nodes, the Client wouldn't
        know or care. Using a worker to run driver programs, that's all it does.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - IDEA is driver. but output is written by spark workers. Not the same environment, and in most cases, not the same machine. Just like it happens for MR reducers. Unless it is "local" master url. Which i assume it was not. This is strange. I can, was able to and will able to. why wouldn't it able to? unless there are network or security issues. There's nothing fundamentally different between reading/writing hdfs from a worker process or any other process. No. Spark client is about shipping driver and have it running somewhere else. it is as if somebody was running mahout cli command on one of the worker nodes. this is it. it knows nothing about hdfs – and even what the driver program is going to do. One might use the Client code to print out "Hello, World" and exit on some of the worker nodes, the Client wouldn't know or care. Using a worker to run driver programs, that's all it does.
        Hide
        pferrel Pat Ferrel added a comment - - edited

        I think IDEA forces some things to run local so it can keep track of threads or something. Seems to work correctly with Spark but not HDFS. There are ways to remote debug with it so it separates processes but I don't need you to help me with IDEA.

        Seems easier to answer: How do I run this from the CLI? Let's get IDEA out of the picture. I bet it will just work.

        We need a way to run these from the CLI via cron or scripts anyway, right?

        Using spark-class I get no errors but no output either. It doesn't create the same Application name so I must be using it wrong. Will look later today.

        Show
        pferrel Pat Ferrel added a comment - - edited I think IDEA forces some things to run local so it can keep track of threads or something. Seems to work correctly with Spark but not HDFS. There are ways to remote debug with it so it separates processes but I don't need you to help me with IDEA. Seems easier to answer: How do I run this from the CLI? Let's get IDEA out of the picture. I bet it will just work. We need a way to run these from the CLI via cron or scripts anyway, right? Using spark-class I get no errors but no output either. It doesn't create the same Application name so I must be using it wrong. Will look later today.
        Hide
        pferrel Pat Ferrel added a comment -

        Hmm, maybe I should ask if anyone has gotten this stuff to read/write to HDFS? I can get read but not write

        Show
        pferrel Pat Ferrel added a comment - Hmm, maybe I should ask if anyone has gotten this stuff to read/write to HDFS? I can get read but not write
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        i have been dumping from spark to hdfs, hbase, memory-mapped index
        structures, you name it, for 2 years.

        Pat, something is definitely getting wrong there – but not by design.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - i have been dumping from spark to hdfs, hbase, memory-mapped index structures, you name it, for 2 years. Pat, something is definitely getting wrong there – but not by design.
        Hide
        pferrel Pat Ferrel added a comment -

        Ok, something misconfigured maybe, IDEA. Let's leave IDEA out of the loop.

        If you could indulge me--how do you run this from the CLI?

        Show
        pferrel Pat Ferrel added a comment - Ok, something misconfigured maybe, IDEA. Let's leave IDEA out of the loop. If you could indulge me--how do you run this from the CLI?
        Hide
        pferrel Pat Ferrel added a comment - - edited

        Silence indicates: you don't know how to? it can't be done because jars aren't created for it yet? you'd rather launch from the Scala shell? If the later, that's fine I just want to get IDEA out of the equation so instructions for running in the Scala shell would be helpful.

        I plan to move on to using HDFS for storage but still have a local storage failure below.

        Concentrating on local storage for now I get the following from my dev machine launching in IDEA:

        input, output, mahoutSparkContext(masterUrl = , Success?
        local path, local path, "local", yes
        local path, local path, "spark://Maclaurin:7077", yes
        local path local path, "spark://occam4:7077", no, computation finishes correctly but the last stage dump/write
        the DRM fails, the spark master is a remote machine who
        is also the HDFS master and is managing three Spark
        slaves, all is OK in the WebUI, no errors in the Spark logs
        ,
        This last case I have tried various forms of the "local path" for output and suspect that using the correct form of the URI may be the problem so if someone sees the mistake please let me know:
        1) "tmp/co-occurrence-on-epinions/indicators-item-item/" relative path to the IDEA working directory, which works for input.
        2) "/Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/" absolute path so no IDEA working directory
        3) "file:///Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/" URI form of full local path

        Code for #3 is:

        RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0),
        "file:///Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/")

        For #3 I get the following exception message. The _temporary dir does exist, there is just nothing in it:

        14/04/15 09:07:03 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at Recommendations.scala:178
        Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 8.0:0 failed 4 times (most recent failure: Exception failure: java.io.IOException: The temporary job-output directory file:/Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/_temporary doesn't exist!)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
        Disconnected from the target VM, address: '127.0.0.1:58830', transport: 'socket'

        Show
        pferrel Pat Ferrel added a comment - - edited Silence indicates: you don't know how to? it can't be done because jars aren't created for it yet? you'd rather launch from the Scala shell? If the later, that's fine I just want to get IDEA out of the equation so instructions for running in the Scala shell would be helpful. I plan to move on to using HDFS for storage but still have a local storage failure below. Concentrating on local storage for now I get the following from my dev machine launching in IDEA: input, output, mahoutSparkContext(masterUrl = , Success? local path, local path, "local", yes local path, local path, "spark://Maclaurin:7077", yes local path local path, "spark://occam4:7077", no, computation finishes correctly but the last stage dump/write the DRM fails, the spark master is a remote machine who is also the HDFS master and is managing three Spark slaves, all is OK in the WebUI, no errors in the Spark logs , This last case I have tried various forms of the "local path" for output and suspect that using the correct form of the URI may be the problem so if someone sees the mistake please let me know: 1) "tmp/co-occurrence-on-epinions/indicators-item-item/" relative path to the IDEA working directory, which works for input. 2) "/Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/" absolute path so no IDEA working directory 3) "file:///Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/" URI form of full local path Code for #3 is: RecommendationExamplesHelper.saveIndicatorMatrix(indicatorMatrices(0), "file:///Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/") For #3 I get the following exception message. The _temporary dir does exist, there is just nothing in it: 14/04/15 09:07:03 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at Recommendations.scala:178 Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 8.0:0 failed 4 times (most recent failure: Exception failure: java.io.IOException: The temporary job-output directory file:/Users/pat/hdfs-mirror/tmp/co-occurrence-on-epinions/indicators-item-item/_temporary doesn't exist!) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Disconnected from the target VM, address: '127.0.0.1:58830', transport: 'socket'
        Hide
        pferrel Pat Ferrel added a comment -

        Running from my dev machine in IDEA against a remote cluster I can read input from HDFS and the computation on Spark seems to complete correctly but the write fails.

        input, output, mahoutSparkContext(masterUrl = , Success?
        HDFS:/occam4/user/pat/xrsj, HDFS:/occam4/user/pat/tmp, spark://occam4:7077, no, computation competes, but output
        gives a failure to connect message
        while trying to write to HDFS

        Again this may be a URI error in the output path or some config problem. From the shell I can examine HDFS and all is as expected. Hadoop jobs that I launch from the command line work against the cluster correctly.

        14/04/15 10:04:02 INFO storage.MemoryStore: Block broadcast_3 stored as values to memory (estimated size 385.2 KB, free 4.7 GB)
        14/04/15 10:04:02 INFO rdd.FlatMappedRDD: Removing RDD 16 from persistence list
        14/04/15 10:04:02 INFO storage.BlockManager: Removing RDD 16
        14/04/15 10:04:02 INFO rdd.FlatMappedRDD: Removing RDD 7 from persistence list
        14/04/15 10:04:02 INFO storage.BlockManager: Removing RDD 7
        14/04/15 10:04:09 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:10 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:11 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:12 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:13 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:14 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:15 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:16 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:17 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        14/04/15 10:04:18 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
        Exception in thread "main" java.net.ConnectException: Call to occam4/192.168.0.14:8020 failed on connection exception: java.net.ConnectException: Connection refused
        at org.apache.hadoop.ipc.Client.wrapException(Client.java:1142)
        at org.apache.hadoop.ipc.Client.call(Client.java:1118)
        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
        at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
        at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source)
        at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422)
        at org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:281)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:245)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:100)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1446)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1464)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
        at org.apache.hadoop.mapred.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:193)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:685)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572)
        at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894)
        at org.apache.mahout.cf.examples.RecommendationExamplesHelper$.saveIndicatorMatrix(Recommendations.scala:178)
        at org.apache.mahout.cf.examples.RunCrossCooccurrenceAnalysisOnEpinions$.main(Recommendations.scala:111)
        at org.apache.mahout.cf.examples.RunCrossCooccurrenceAnalysisOnEpinions.main(Recommendations.scala)
        Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:511)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:481)
        at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:457)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:583)
        at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:205)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1249)
        at org.apache.hadoop.ipc.Client.call(Client.java:1093)
        ... 26 more
        Disconnected from the target VM, address: '127.0.0.1:59483', transport: 'socket'

        Show
        pferrel Pat Ferrel added a comment - Running from my dev machine in IDEA against a remote cluster I can read input from HDFS and the computation on Spark seems to complete correctly but the write fails. input, output, mahoutSparkContext(masterUrl = , Success? HDFS:/occam4/user/pat/xrsj, HDFS:/occam4/user/pat/tmp, spark://occam4:7077, no, computation competes, but output gives a failure to connect message while trying to write to HDFS Again this may be a URI error in the output path or some config problem. From the shell I can examine HDFS and all is as expected. Hadoop jobs that I launch from the command line work against the cluster correctly. 14/04/15 10:04:02 INFO storage.MemoryStore: Block broadcast_3 stored as values to memory (estimated size 385.2 KB, free 4.7 GB) 14/04/15 10:04:02 INFO rdd.FlatMappedRDD: Removing RDD 16 from persistence list 14/04/15 10:04:02 INFO storage.BlockManager: Removing RDD 16 14/04/15 10:04:02 INFO rdd.FlatMappedRDD: Removing RDD 7 from persistence list 14/04/15 10:04:02 INFO storage.BlockManager: Removing RDD 7 14/04/15 10:04:09 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:10 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:11 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:12 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:13 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:14 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:15 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:16 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:17 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/04/15 10:04:18 INFO ipc.Client: Retrying connect to server: occam4/192.168.0.14:8020. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) Exception in thread "main" java.net.ConnectException: Call to occam4/192.168.0.14:8020 failed on connection exception: java.net.ConnectException: Connection refused at org.apache.hadoop.ipc.Client.wrapException(Client.java:1142) at org.apache.hadoop.ipc.Client.call(Client.java:1118) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229) at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62) at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422) at org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:281) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:245) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:100) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1446) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1464) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:193) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:685) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:572) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:894) at org.apache.mahout.cf.examples.RecommendationExamplesHelper$.saveIndicatorMatrix(Recommendations.scala:178) at org.apache.mahout.cf.examples.RunCrossCooccurrenceAnalysisOnEpinions$.main(Recommendations.scala:111) at org.apache.mahout.cf.examples.RunCrossCooccurrenceAnalysisOnEpinions.main(Recommendations.scala) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:511) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:481) at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:457) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:583) at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:205) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1249) at org.apache.hadoop.ipc.Client.call(Client.java:1093) ... 26 more Disconnected from the target VM, address: '127.0.0.1:59483', transport: 'socket'
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        [My] Silence idicates I've been pretty sick

        I thought i explained in my email we are not planning CLI. We are planning script shell instead. There is not, nor do i think there will be a way to run this stuff with CLI, just like there's no way to invoke a particular method in R without writing a short script.

        That said, yes, you can try to run it as a java application, i.e. [java|scala] -cp <cp>. <class name>

        where -cp is what `mahout classpath` returns.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - [My] Silence idicates I've been pretty sick I thought i explained in my email we are not planning CLI. We are planning script shell instead. There is not, nor do i think there will be a way to run this stuff with CLI, just like there's no way to invoke a particular method in R without writing a short script. That said, yes, you can try to run it as a java application, i.e. [java|scala] -cp <cp>. <class name> where -cp is what `mahout classpath` returns.
        Hide
        pferrel Pat Ferrel added a comment -

        To sum up, Spark Cooccurrence seems to complete correctly on the Spark Cluster in any of the configurations. Writing output has been failing on any case when using the remote Spark cluster for computation. However as far as I can tell input from local filesystem or HDFS seems to work in all cases.

        Next I'll try running my tests from the Spark master machine by installing IDEA there. There must be some other way than IDEA to run this?

        Show
        pferrel Pat Ferrel added a comment - To sum up, Spark Cooccurrence seems to complete correctly on the Spark Cluster in any of the configurations. Writing output has been failing on any case when using the remote Spark cluster for computation. However as far as I can tell input from local filesystem or HDFS seems to work in all cases. Next I'll try running my tests from the Spark master machine by installing IDEA there. There must be some other way than IDEA to run this?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment - - edited

        Running from my dev machine in IDEA against a remote cluster I can read input from HDFS and the computation on Spark seems to complete correctly but the write fails.

        input, output, mahoutSparkContext(masterUrl = , Success?
        HDFS:/occam4/user/pat/xrsj, HDFS:/occam4/user/pa

        is this a spark task log? or front end log? seems worker process on the cluster is trying to connect to hdfs name node (iirc port 8020 is hadoop namenode service) and fails. Config/networking issues.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - - edited Running from my dev machine in IDEA against a remote cluster I can read input from HDFS and the computation on Spark seems to complete correctly but the write fails. input, output, mahoutSparkContext(masterUrl = , Success? HDFS:/occam4/user/pat/xrsj, HDFS:/occam4/user/pa is this a spark task log? or front end log? seems worker process on the cluster is trying to connect to hdfs name node (iirc port 8020 is hadoop namenode service) and fails. Config/networking issues.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        where -cp is what `mahout classpath` returns.

        Actually, scratch that. That generally is still a bad recipe. "mahout classpath" would return installed HADOOP_HOME dependencies (normally one doesn't want that because spark managed-libs already exposes whatever version of hadoop it was compiled with), and it neglects to add Spark classpath. So i don't this 'mahout classpath' is plenty useful here.

        BTW that's another thing here – you need to compile Spark with correct version of hadoop hdfs you intend to use (at least that's what i do). By default i think it does a terrible thing.

        The main suggestion stands – collect 'cp' correctly, which idea already does via maven, but the major hurdle is to do it manually – and user-friendly methods for those are not yet present methinks.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - where -cp is what `mahout classpath` returns. Actually, scratch that. That generally is still a bad recipe. "mahout classpath" would return installed HADOOP_HOME dependencies (normally one doesn't want that because spark managed-libs already exposes whatever version of hadoop it was compiled with), and it neglects to add Spark classpath. So i don't this 'mahout classpath' is plenty useful here. BTW that's another thing here – you need to compile Spark with correct version of hadoop hdfs you intend to use (at least that's what i do). By default i think it does a terrible thing. The main suggestion stands – collect 'cp' correctly, which idea already does via maven, but the major hurdle is to do it manually – and user-friendly methods for those are not yet present methinks.
        Hide
        pferrel Pat Ferrel added a comment -

        Well here's something I noticed that may be a clue. First there were some scala 2.10 jars that were built for hadoop 1.0.4 sitting in an assembly/target dir. So even though the managed_lib dir had the correct 1.2.1 version of hadoop I rebuilt and go rid of any 1.0.4 jars I could find.

        Then if I am running hadoop and mahout locally I can launch the Spark shell, where it creates a default context called sc. I can then perform the following:

        Created spark context..
        Spark context available as sc.

        scala> val textFile = sc.textFile("xrsj/ratings_data.txt")
        14/04/16 18:19:52 INFO storage.MemoryStore: ensureFreeSpace(61374) called with curMem=0, maxMem=318111744
        14/04/16 18:19:52 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 59.9 KB, free 303.3 MB)
        textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

        scala> textFile.count()
        14/04/16 18:19:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
        ...
        14/04/16 18:20:00 INFO spark.SparkContext: Job finished: count at <console>:15, took 0.995702 s
        res0: Long = 664824

        If I exit the spark shell, start a local pseudo cluster then start the shell the same code works, only reading from the hdfs pseudo-cluster. The same exact code works for the cluster too since the file is at the same location relative to where I start the shell.

        I can also address the file in absolute terms with the line below. Notice I need to use the port # and leaving if off leads to the failure to connect message in a previous comment.

        scala> val textFile = sc.textFile("hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt")
        scala> textFile.count()

        I tried using the port # in the cooccurrence test case but get the same failure to connect message.

        Since the Spark Scala shell is creating the context by detecting the machine's HDFS setup, could this be the problem in the IDEA running cooccurrence example? The context in the example is setup after the input is read from HDFS is that correct? I know it is not supposed to care about HDFS, only the Spark master but obviously when a context is created by the Spark Shell and is using the context to get the text file it works. Should we be doing that in the example? I'll try playing with how the text file is read and where the context is created.

        Perhaps naively I would have thought that the URI I used for read and write would bypass any default settings in the context but this doesn't seem to be true does it? I suspect something in the context or lack of it is causing Spark to be confused about where to read and write from, no matter how explicit the URI.

        Show
        pferrel Pat Ferrel added a comment - Well here's something I noticed that may be a clue. First there were some scala 2.10 jars that were built for hadoop 1.0.4 sitting in an assembly/target dir. So even though the managed_lib dir had the correct 1.2.1 version of hadoop I rebuilt and go rid of any 1.0.4 jars I could find. Then if I am running hadoop and mahout locally I can launch the Spark shell, where it creates a default context called sc. I can then perform the following: Created spark context.. Spark context available as sc. scala> val textFile = sc.textFile("xrsj/ratings_data.txt") 14/04/16 18:19:52 INFO storage.MemoryStore: ensureFreeSpace(61374) called with curMem=0, maxMem=318111744 14/04/16 18:19:52 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 59.9 KB, free 303.3 MB) textFile: org.apache.spark.rdd.RDD [String] = MappedRDD [1] at textFile at <console>:12 scala> textFile.count() 14/04/16 18:19:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ... 14/04/16 18:20:00 INFO spark.SparkContext: Job finished: count at <console>:15, took 0.995702 s res0: Long = 664824 If I exit the spark shell, start a local pseudo cluster then start the shell the same code works, only reading from the hdfs pseudo-cluster. The same exact code works for the cluster too since the file is at the same location relative to where I start the shell. I can also address the file in absolute terms with the line below. Notice I need to use the port # and leaving if off leads to the failure to connect message in a previous comment. scala> val textFile = sc.textFile("hdfs://occam4:54310/user/pat/xrsj/ratings_data.txt") scala> textFile.count() I tried using the port # in the cooccurrence test case but get the same failure to connect message. Since the Spark Scala shell is creating the context by detecting the machine's HDFS setup, could this be the problem in the IDEA running cooccurrence example? The context in the example is setup after the input is read from HDFS is that correct? I know it is not supposed to care about HDFS, only the Spark master but obviously when a context is created by the Spark Shell and is using the context to get the text file it works. Should we be doing that in the example? I'll try playing with how the text file is read and where the context is created. Perhaps naively I would have thought that the URI I used for read and write would bypass any default settings in the context but this doesn't seem to be true does it? I suspect something in the context or lack of it is causing Spark to be confused about where to read and write from, no matter how explicit the URI.
        Hide
        pferrel Pat Ferrel added a comment -

        Getting the cooccurrence code to read or write to hdfs is still not working. The cooccurrence code does not seem to use the context that is created though the computation does execute on the cluster and seems to complete properly, I wonder if the context is needed to do the read/write as it is in the above spark-shell example. So the following val is not use afaikt.

        implicit val sc = mahoutSparkContext(masterUrl = "spark://occam4:7077", appName = "MahoutClusterContext",
        customJars = Traversable.empty[String])

        I can't even get this job to complete using the local file system, some strange paths are created for "_temporary" depending on who knows what. one even looked like some version of Linux I don't own: Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 8.0:0 failed 4 times (most recent failure:

        Exception failure: java.io.IOException: The temporary job-output directory file:/private/tmp/tmp/co-occurrence-on-epinions/indicators-item-item/_temporary doesn't exist!)

        /private/tmp ??? what is that Centos? I'm using ubuntu 12.04

        Onwards to looking at the Spark config.

        Can you answer the question about why we don't use the context 'sc' to read and write as with the spark-shell example?

        Show
        pferrel Pat Ferrel added a comment - Getting the cooccurrence code to read or write to hdfs is still not working. The cooccurrence code does not seem to use the context that is created though the computation does execute on the cluster and seems to complete properly, I wonder if the context is needed to do the read/write as it is in the above spark-shell example. So the following val is not use afaikt. implicit val sc = mahoutSparkContext(masterUrl = "spark://occam4:7077", appName = "MahoutClusterContext", customJars = Traversable.empty [String] ) I can't even get this job to complete using the local file system, some strange paths are created for "_temporary" depending on who knows what. one even looked like some version of Linux I don't own: Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 8.0:0 failed 4 times (most recent failure: Exception failure: java.io.IOException: The temporary job-output directory file:/private/tmp/tmp/co-occurrence-on-epinions/indicators-item-item/_temporary doesn't exist!) /private/tmp ??? what is that Centos? I'm using ubuntu 12.04 Onwards to looking at the Spark config. Can you answer the question about why we don't use the context 'sc' to read and write as with the spark-shell example?
        Hide
        pferrel Pat Ferrel added a comment -

        Oh, and are you using the spark-shell to execute the cooccurrence examples?

        Show
        pferrel Pat Ferrel added a comment - Oh, and are you using the spark-shell to execute the cooccurrence examples?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        the idea most likely willl have screwed hadoop dependency because by default it inherits it from thath of Mahout's default dependency. I used to run this stuff (or rather our internal variant of this stuff) from my own project which has a very strict control over dependendencies (esp. hadoop dependencies). I also inserted a CDH4 profile to spark module which overrides Mahout's default hadoop dependency, and that should help – but it is still a pain, i gave up on running it from idea with Mahout maven dependencies. Something is screwed there in the end.

        i don't experiment with RSJ yet – i guess i will leave it to Sebastian at this point .

        what i do is running the following script on my "shell" branch in github via mahout shell

        "simple.mscala"
         
        val a = dense((1,2,3),(3,4,5))
        val drmA = drmParallelize(a,numPartitions = 2)
        val drmAtA = drmA.t %*% drmA
        
        val r = drmAtA.mapBlock() {
          case (keys, block) =>
            block += 1.0
            keys -> block
        }.checkpoint(/*StorageLevel.NONE*/)
        
        r.collect
        
        // local write
        r.writeDRM("file:///home/dmitriy/A")
        
        // hdfs write
        r.writeDRM("hdfs://localhost:11010/A")
        

        which actually runs totally fine in local mode, and sometimes also runs ok in "standalone"/hdfs mode but sometimes there are strange after-effects of hangs and bailing out with OOM when run on remote cluster with "standalone".

        I am pretty sure it is either dependency issues again in Mahout maven build, or something that has happened to Spark 0.9.x release. Spark 0.6.x – 0.8.x releases and earlier had absolutely no trouble working with hdfs sequence files.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - the idea most likely willl have screwed hadoop dependency because by default it inherits it from thath of Mahout's default dependency. I used to run this stuff (or rather our internal variant of this stuff) from my own project which has a very strict control over dependendencies (esp. hadoop dependencies). I also inserted a CDH4 profile to spark module which overrides Mahout's default hadoop dependency, and that should help – but it is still a pain, i gave up on running it from idea with Mahout maven dependencies. Something is screwed there in the end. i don't experiment with RSJ yet – i guess i will leave it to Sebastian at this point . what i do is running the following script on my "shell" branch in github via mahout shell "simple.mscala" val a = dense((1,2,3),(3,4,5)) val drmA = drmParallelize(a,numPartitions = 2) val drmAtA = drmA.t %*% drmA val r = drmAtA.mapBlock() { case (keys, block) => block += 1.0 keys -> block }.checkpoint(/*StorageLevel.NONE*/) r.collect // local write r.writeDRM( "file: ///home/dmitriy/A" ) // hdfs write r.writeDRM( "hdfs: //localhost:11010/A" ) which actually runs totally fine in local mode, and sometimes also runs ok in "standalone"/hdfs mode but sometimes there are strange after-effects of hangs and bailing out with OOM when run on remote cluster with "standalone". I am pretty sure it is either dependency issues again in Mahout maven build, or something that has happened to Spark 0.9.x release. Spark 0.6.x – 0.8.x releases and earlier had absolutely no trouble working with hdfs sequence files.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        also just discovered that sbt build in 0.9.1 screws hbase dependency. Not likely to be much of a reason, but who knows.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - also just discovered that sbt build in 0.9.1 screws hbase dependency. Not likely to be much of a reason, but who knows.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Hm. At home i don't have any trouble reading/writing from/to hdfs.

        There are some minor differences in configuration plus i am running hdfs cdh 4.3.2 at home vs. 4.3.0 at work computer. That's the only difference.

        (some patchlevel specific?)

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Hm. At home i don't have any trouble reading/writing from/to hdfs. There are some minor differences in configuration plus i am running hdfs cdh 4.3.2 at home vs. 4.3.0 at work computer. That's the only difference. (some patchlevel specific?)
        Hide
        pferrel Pat Ferrel added a comment -

        It looks like our setups are pretty much identical as far as I can tell. The primary difference is in using the IDE to launch and that may be causing the problem.

        Therefore I'll put the testing aside for awhile and work on getting Dmitriy's Spark Scala shell working since we know that write from there is working--at least writeDRM is.

        As I've said, it looks like the cluster cooccurrence computation (both cross and self similarity) is being executed properly on the epinions data but I'm unable to get a file output.

        Show
        pferrel Pat Ferrel added a comment - It looks like our setups are pretty much identical as far as I can tell. The primary difference is in using the IDE to launch and that may be causing the problem. Therefore I'll put the testing aside for awhile and work on getting Dmitriy's Spark Scala shell working since we know that write from there is working--at least writeDRM is. As I've said, it looks like the cluster cooccurrence computation (both cross and self similarity) is being executed properly on the epinions data but I'm unable to get a file output.
        Hide
        pferrel Pat Ferrel added a comment -

        Runs correctly on clustered Spark and HDFS.

        Is there more to do here? Are the other similarity types needed?

        Show
        pferrel Pat Ferrel added a comment - Runs correctly on clustered Spark and HDFS. Is there more to do here? Are the other similarity types needed?
        Hide
        ssc Sebastian Schelter added a comment -

        Pat Ferrel Great, how large was your testdataset?

        I'd vote against other similarity types for sake of similarity, LLR also works best in my experience

        Show
        ssc Sebastian Schelter added a comment - Pat Ferrel Great, how large was your testdataset? I'd vote against other similarity types for sake of similarity, LLR also works best in my experience
        Hide
        pferrel Pat Ferrel added a comment -

        Agreed about LLR. Never saw it underperform the other measures for CF.

        I'm using a very small dataset with minSplits = 2 so I can check the actual values. Then running the epinions, which I can't really check for value. Testing is going through the ItemSimilarityDriver from MAHOUT-1541 so the write is not using your patch.

        Unfortunately I lied. Works with HDFS in and out but on a multi-threaded local[4] standalone Spark. Setting to my cluster master still fails. The error message is about connection refused so there is something is still not configured correctly on my cluster. I had to use fully qualified URIs to the data because Spark was defaulting to the wrong locations. All pointing to a bad Spark build or conf. Spark-shell seems to work fine on the cluster. Anyway, I'll reinstall Spark and try again. Sorry for the false alarm.

        Show
        pferrel Pat Ferrel added a comment - Agreed about LLR. Never saw it underperform the other measures for CF. I'm using a very small dataset with minSplits = 2 so I can check the actual values. Then running the epinions, which I can't really check for value. Testing is going through the ItemSimilarityDriver from MAHOUT-1541 so the write is not using your patch. Unfortunately I lied. Works with HDFS in and out but on a multi-threaded local [4] standalone Spark. Setting to my cluster master still fails. The error message is about connection refused so there is something is still not configured correctly on my cluster. I had to use fully qualified URIs to the data because Spark was defaulting to the wrong locations. All pointing to a bad Spark build or conf. Spark-shell seems to work fine on the cluster. Anyway, I'll reinstall Spark and try again. Sorry for the false alarm.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        Is there anything else to commit here?

        Show
        dlyubimov Dmitriy Lyubimov added a comment - Is there anything else to commit here?
        Hide
        pferrel Pat Ferrel added a comment -

        There have been no commits afaik. The status is for Sebastian to say but I've used the cooccurrence analysis and it works correctly. I can't verify Spark cluster execution with HDFS due to what I think is my own bad setup.

        If someone else could test it on a cluster I'd say it should be committed. If we can wait, I'm trying to get my cluster upgraded to hadoop 2 and reconfigure Spark for that. Then try testing this on the new setup.

        There are no scala tests for this though there are some in the patches. I'm adding some scala tests that will cover this code in doing a CLI in MAHOUT-1541, which is a few weeks from being able to commit.

        Not sure if it's packaged correctly, the tests supplied here are really examples since they are on large datasets and take a long time to execute.

        Bottom line is it needs to be verified on a Cluster and checked for package structure. I'm happy to do this if we don't need it committed right away. Both of these things need to be done as part of MAHOUT-1541, which I'm actively working on but is not really ready to review yet.

        Show
        pferrel Pat Ferrel added a comment - There have been no commits afaik. The status is for Sebastian to say but I've used the cooccurrence analysis and it works correctly. I can't verify Spark cluster execution with HDFS due to what I think is my own bad setup. If someone else could test it on a cluster I'd say it should be committed. If we can wait, I'm trying to get my cluster upgraded to hadoop 2 and reconfigure Spark for that. Then try testing this on the new setup. There are no scala tests for this though there are some in the patches. I'm adding some scala tests that will cover this code in doing a CLI in MAHOUT-1541 , which is a few weeks from being able to commit. Not sure if it's packaged correctly, the tests supplied here are really examples since they are on large datasets and take a long time to execute. Bottom line is it needs to be verified on a Cluster and checked for package structure. I'm happy to do this if we don't need it committed right away. Both of these things need to be done as part of MAHOUT-1541 , which I'm actively working on but is not really ready to review yet.
        Hide
        pferrel Pat Ferrel added a comment -

        Sebastian Schelter Should I reassign to me for now so we can get this committed?

        Show
        pferrel Pat Ferrel added a comment - Sebastian Schelter Should I reassign to me for now so we can get this committed?
        Hide
        pferrel Pat Ferrel added a comment -

        Looks like DrmLike may have been refactored since this patch was written.

        Dmitriy Lyubimov The following patch code has an error at "elem" saying "Missing parameter type 'elem'" Looking at the scaladocs I tracked back to the DrmLike trait and see no way to .mapBlock on it. Has something been refactored here? The .nonZeroes() is a java sparse vector iterator I think. This worked about a month ago so thought you might have an idea how things have changed?

        Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml
          def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
                                bcastNumInteractionsB: Broadcast[Vector], bcastNumInteractionsA: Broadcast[Vector],
                                crossCooccurrence: Boolean = true) = {
            drmBtA.mapBlock() {
              case (keys, block) =>
        
                val llrBlock = block.like()
                val numInteractionsB: Vector = bcastNumInteractionsB
                val numInteractionsA: Vector = bcastNumInteractionsA
        
                for (index <- 0 until keys.size) {
        
                  val thingB = keys(index)
        
                  // PriorityQueue to select the top-k items
                  val topItemsPerThing = new mutable.PriorityQueue[(Int,Double)]()(orderByScore)
        
                  block(index, ::).nonZeroes().foreach { elem => //!!!!!!!!!!!!! Error: "Missing parameter type 'elem'"
                    val thingA = elem.index
                    val cooccurrences = elem.get
        
        Show
        pferrel Pat Ferrel added a comment - Looks like DrmLike may have been refactored since this patch was written. Dmitriy Lyubimov The following patch code has an error at "elem" saying "Missing parameter type 'elem'" Looking at the scaladocs I tracked back to the DrmLike trait and see no way to .mapBlock on it. Has something been refactored here? The .nonZeroes() is a java sparse vector iterator I think. This worked about a month ago so thought you might have an idea how things have changed? Unable to find source-code formatter for language: scala. Available languages are: actionscript, html, java, javascript, none, sql, xhtml, xml def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, bcastNumInteractionsB: Broadcast[Vector], bcastNumInteractionsA: Broadcast[Vector], crossCooccurrence: Boolean = true ) = { drmBtA.mapBlock() { case (keys, block) => val llrBlock = block.like() val numInteractionsB: Vector = bcastNumInteractionsB val numInteractionsA: Vector = bcastNumInteractionsA for (index <- 0 until keys.size) { val thingB = keys(index) // PriorityQueue to select the top-k items val topItemsPerThing = new mutable.PriorityQueue[(Int, Double )]()(orderByScore) block(index, ::).nonZeroes().foreach { elem => //!!!!!!!!!!!!! Error: "Missing parameter type 'elem'" val thingA = elem.index val cooccurrences = elem.get
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        I think this has nothing to do with anything in Spark or scala bindings I think .

        the .nonZeroes() is mahout-math method (java) which produces a java iterator, which then implicitly cast to scala iterator (since .foreach is scala operator).

        is JavaConversions._ still imported?

        Show
        dlyubimov Dmitriy Lyubimov added a comment - I think this has nothing to do with anything in Spark or scala bindings I think . the .nonZeroes() is mahout-math method (java) which produces a java iterator, which then implicitly cast to scala iterator (since .foreach is scala operator). is JavaConversions._ still imported?
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        if you want me to verify this, please convert to pull request so i can painlessly sync to exactly what you are testing.

        Show
        dlyubimov Dmitriy Lyubimov added a comment - if you want me to verify this, please convert to pull request so i can painlessly sync to exactly what you are testing.
        Hide
        pferrel Pat Ferrel added a comment -

        import scala.collection.JavaConversions._

        is included. I'll pare back to just this ticket and send a PR

        Show
        pferrel Pat Ferrel added a comment - import scala.collection.JavaConversions._ is included. I'll pare back to just this ticket and send a PR
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov closed the pull request at:

        https://github.com/apache/mahout/pull/8

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov closed the pull request at: https://github.com/apache/mahout/pull/8
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user dlyubimov reopened a pull request:

        https://github.com/apache/mahout/pull/8

        MAHOUT-1464 Cooccurrence Analysis on Spark

        Grabbed Pat's branch. submitting as PR (WIP at this point).

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/dlyubimov/mahout MAHOUT-1464

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/mahout/pull/8.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #8


        commit 70654fa58dd4b801c551429945fa2f1377a60b2e
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-02T21:11:55Z

        starting to merge the cooccurrence stuff, import errors

        commit fc5fb6ac37e4c12d25c35ddb7912a32aac06e449
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-02T21:33:45Z

        tried changing the imports in CooccurrenceAnalysis.scala to no avail

        commit 242aed0e0921afe9a87ee8973ba8077cbe65fffa
        Author: Dmitriy Lyubimov <dlyubimov@apache.org>
        Date: 2014-06-02T22:42:57Z

        Compilation fixes, updates for MAHOUT-1529 changes


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user dlyubimov reopened a pull request: https://github.com/apache/mahout/pull/8 MAHOUT-1464 Cooccurrence Analysis on Spark Grabbed Pat's branch. submitting as PR (WIP at this point). You can merge this pull request into a Git repository by running: $ git pull https://github.com/dlyubimov/mahout MAHOUT-1464 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/8.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8 commit 70654fa58dd4b801c551429945fa2f1377a60b2e Author: pferrel <pat@occamsmachete.com> Date: 2014-06-02T21:11:55Z starting to merge the cooccurrence stuff, import errors commit fc5fb6ac37e4c12d25c35ddb7912a32aac06e449 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-02T21:33:45Z tried changing the imports in CooccurrenceAnalysis.scala to no avail commit 242aed0e0921afe9a87ee8973ba8077cbe65fffa Author: Dmitriy Lyubimov <dlyubimov@apache.org> Date: 2014-06-02T22:42:57Z Compilation fixes, updates for MAHOUT-1529 changes
        Hide
        pferrel Pat Ferrel added a comment -

        My problem is that my cluster is 1.2.1 and to upgrade everything I run on it has to go to H2. Oh bother.

        I think the best thing is commit this and see it someone will run one of the several included tests on a cluster. It works local and seems to work clustered but the write fails. The write is not part of the core code.

        Anyway unless someone vetos I'll commit it once I get at least one build integrated test included.

        Show
        pferrel Pat Ferrel added a comment - My problem is that my cluster is 1.2.1 and to upgrade everything I run on it has to go to H2. Oh bother. I think the best thing is commit this and see it someone will run one of the several included tests on a cluster. It works local and seems to work clustered but the write fails. The write is not part of the core code. Anyway unless someone vetos I'll commit it once I get at least one build integrated test included.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/8#discussion_r13425122

        — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala —
        @@ -0,0 +1,210 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.mahout.cf
        +
        +import org.apache.mahout.math._
        +import scalabindings._
        +import RLikeOps._
        +import drm._
        +import RLikeDrmOps._
        +import org.apache.mahout.sparkbindings._
        +
        +import scala.collection.JavaConversions._
        +import org.apache.mahout.math.stats.LogLikelihood
        +import collection._
        +// import scala.collection.parallel.mutable
        — End diff –

        we can remove the .parallel. import

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on a diff in the pull request: https://github.com/apache/mahout/pull/8#discussion_r13425122 — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala — @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ + +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +// import scala.collection.parallel.mutable — End diff – we can remove the .parallel. import
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45224589

        OK, I tend to use IDEA import optimization, which works about 90% of the time. Notice that the mutable import messes things up so D removed that.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45224589 OK, I tend to use IDEA import optimization, which works about 90% of the time. Notice that the mutable import messes things up so D removed that.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45234064

        I could use a little advice here. The epinions and movielens tests in the examples folder. Should they be put into the build?

        Pros: good example data.
        Cons: the reading and writing are not parallel and so only work locally. It is easy to change the Spark context to use a cluster but the data still has to be local. These tests would be easier to maintain if they were attached to the ItemSimilarityDriver, which will handle cluster storage and execution and will be maintained better.

        I'd rather move them out into an ItemSimilarityDriver examples folder and will do this if no one objects. They will not be build tests, obviously, since they take too long.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45234064 I could use a little advice here. The epinions and movielens tests in the examples folder. Should they be put into the build? Pros: good example data. Cons: the reading and writing are not parallel and so only work locally. It is easy to change the Spark context to use a cluster but the data still has to be local. These tests would be easier to maintain if they were attached to the ItemSimilarityDriver, which will handle cluster storage and execution and will be maintained better. I'd rather move them out into an ItemSimilarityDriver examples folder and will do this if no one objects. They will not be build tests, obviously, since they take too long.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45234269

        Its not allowed to redistribute the movielens dataset.

        On 06/05/2014 05:28 PM, Pat Ferrel wrote:
        > I could use a little advice here. The epinions and movielens tests in the examples folder. Should they be put into the build?
        >
        > Pros: good example data.
        > Cons: the reading and writing are not parallel and so only work locally. It is easy to change the Spark context to use a cluster but the data still has to be local. These tests would be easier to maintain if they were attached to the ItemSimilarityDriver, which will handle cluster storage and execution and will be maintained better.
        >
        > I'd rather move them out into an ItemSimilarityDriver examples folder and will do this if no one objects. They will not be build tests, obviously, since they take too long.
        >
        > —
        > Reply to this email directly or view it on GitHub:
        > https://github.com/apache/mahout/pull/8#issuecomment-45234064
        >

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45234269 Its not allowed to redistribute the movielens dataset. On 06/05/2014 05:28 PM, Pat Ferrel wrote: > I could use a little advice here. The epinions and movielens tests in the examples folder. Should they be put into the build? > > Pros: good example data. > Cons: the reading and writing are not parallel and so only work locally. It is easy to change the Spark context to use a cluster but the data still has to be local. These tests would be easier to maintain if they were attached to the ItemSimilarityDriver, which will handle cluster storage and execution and will be maintained better. > > I'd rather move them out into an ItemSimilarityDriver examples folder and will do this if no one objects. They will not be build tests, obviously, since they take too long. > > — > Reply to this email directly or view it on GitHub: > https://github.com/apache/mahout/pull/8#issuecomment-45234064 >
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45234551

        yes, but downloading is described in the comments

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45234551 yes, but downloading is described in the comments
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45235053

        I guess I'm suggesting that examples like these might be good in the right place. Not as build tests but as usage examples. As long as they use only supported code (read/write for instance)

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45235053 I guess I'm suggesting that examples like these might be good in the right place. Not as build tests but as usage examples. As long as they use only supported code (read/write for instance)
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45237262

        I Like Pat's github avatar :+1:

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45237262 I Like Pat's github avatar :+1:
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45238498

        You a Ren and Stimpy fan or is it just the way you feel sometimes?

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45238498 You a Ren and Stimpy fan or is it just the way you feel sometimes?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45239528

        i don't even know that cartoon, just thought is was funny. Yeah, thinking of it, it is how i used to feel most of the time looking at my colleagues' code at work

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45239528 i don't even know that cartoon, just thought is was funny. Yeah, thinking of it, it is how i used to feel most of the time looking at my colleagues' code at work
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45241940

        Hah, that's me looking at my own code

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45241940 Hah, that's me looking at my own code
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user pferrel opened a pull request:

        https://github.com/apache/mahout/pull/12

        Mahout 1464

        MAHOUT-1464 looks ready to me but can't push it yet.

        My build is broken from an unrelated mr-legacy test so I'll wait to push it until my local build passes but wanted to get this out for review if anyone cares.

        I took out the epinions and movielens examples, will add them back in with the CLI driver maybe.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/pferrel/mahout mahout-1464

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/mahout/pull/12.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #12


        commit 107a0ba9605241653a85b113661a8fa5c055529f
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-04T19:54:22Z

        added Sebastian's CooccurrenceAnalysis patch updated it to use current Mahout-DSL

        commit 16c03f7fa73c156859d1dba3a333ef9e8bf922b0
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-04T21:32:18Z

        added Sebastian's MurmurHash changes

        Signed-off-by: pferrel <pat@occamsmachete.com>

        commit c6adaa44c80bba99d41600e260bbb1ad5c972e69
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-05T16:52:23Z

        MAHOUT-1464 import cleanup, minor changes to examples for running on Spark Cluster

        commit 1d66e5726e71e297ef4a7a27331463ba363098c0
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T20:19:32Z

        scalatest for cooccurrence cross and self along with other CooccurrenceAnalyisi methods

        commit 766db0f9e7feb70520fbd444afcb910788f01e76
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T20:20:46Z

        Merge branch 'master' into mahout-1464

        commit e492976688cb8860354bb20a362d370405f560e1
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T20:50:07Z

        cleaned up test comments

        commit a49692eb1664de4b15de1864b95701a6410c80c8
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T21:09:55Z

        got those cursed .DS_Stores out of the branch and put an exclude in .gitignore


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user pferrel opened a pull request: https://github.com/apache/mahout/pull/12 Mahout 1464 MAHOUT-1464 looks ready to me but can't push it yet. My build is broken from an unrelated mr-legacy test so I'll wait to push it until my local build passes but wanted to get this out for review if anyone cares. I took out the epinions and movielens examples, will add them back in with the CLI driver maybe. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pferrel/mahout mahout-1464 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/12.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #12 commit 107a0ba9605241653a85b113661a8fa5c055529f Author: pferrel <pat@occamsmachete.com> Date: 2014-06-04T19:54:22Z added Sebastian's CooccurrenceAnalysis patch updated it to use current Mahout-DSL commit 16c03f7fa73c156859d1dba3a333ef9e8bf922b0 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-04T21:32:18Z added Sebastian's MurmurHash changes Signed-off-by: pferrel <pat@occamsmachete.com> commit c6adaa44c80bba99d41600e260bbb1ad5c972e69 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-05T16:52:23Z MAHOUT-1464 import cleanup, minor changes to examples for running on Spark Cluster commit 1d66e5726e71e297ef4a7a27331463ba363098c0 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T20:19:32Z scalatest for cooccurrence cross and self along with other CooccurrenceAnalyisi methods commit 766db0f9e7feb70520fbd444afcb910788f01e76 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T20:20:46Z Merge branch 'master' into mahout-1464 commit e492976688cb8860354bb20a362d370405f560e1 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T20:50:07Z cleaned up test comments commit a49692eb1664de4b15de1864b95701a6410c80c8 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T21:09:55Z got those cursed .DS_Stores out of the branch and put an exclude in .gitignore
        Hide
        pferrel Pat Ferrel added a comment -

        While I was waiting for the build to settle down I wrote some more tests for different value types. The same row/column is used for each input so all the LLR indicator matrices should be the same and are using the Hadoop code. But using integers of larger than 1 magnitude returns an empty indicator matrix.

        input:

        val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))

        should produce

        val matrixLLRCoocAtAControl = dense(
        (0.0, 1.7260924347106847, 0, 0, 0),
        (1.7260924347106847, 0, 0, 0, 0),
        (0, 0, 0, 1.7260924347106847, 0),
        (0, 0, 1.7260924347106847, 0, 0),
        (0, 0, 0, 0, 0)
        )

        however the following gets an empty matrix returned.

        val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB))
        //var cp = drmSelfCooc(0).checkpoint()
        //cp.writeDRM("/tmp/cooc-spark/")//to get values written
        val matrixSelfCooc = drmCooc(0).checkpoint().collect

        matrixSelfCooc is always empty. Took the same input to the Mahout version using LLR and it produces the correct result == matrixLLRCoocAtAControl.

        Still investigating why this happens.

        Show
        pferrel Pat Ferrel added a comment - While I was waiting for the build to settle down I wrote some more tests for different value types. The same row/column is used for each input so all the LLR indicator matrices should be the same and are using the Hadoop code. But using integers of larger than 1 magnitude returns an empty indicator matrix. input: val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0)) should produce val matrixLLRCoocAtAControl = dense( (0.0, 1.7260924347106847, 0, 0, 0), (1.7260924347106847, 0, 0, 0, 0), (0, 0, 0, 1.7260924347106847, 0), (0, 0, 1.7260924347106847, 0, 0), (0, 0, 0, 0, 0) ) however the following gets an empty matrix returned. val drmCooc = CooccurrenceAnalysis.cooccurrences(drmARaw = drmA, drmBs = Array(drmB)) //var cp = drmSelfCooc(0).checkpoint() //cp.writeDRM("/tmp/cooc-spark/")//to get values written val matrixSelfCooc = drmCooc(0).checkpoint().collect matrixSelfCooc is always empty. Took the same input to the Mahout version using LLR and it produces the correct result == matrixLLRCoocAtAControl. Still investigating why this happens.
        Hide
        pferrel Pat Ferrel added a comment -

        The indicator matrix contains self similarity. The code seems to imply that self similarity values should be excluded. Certainly the mahout itemsimilarity doesn't return them.

        Show
        pferrel Pat Ferrel added a comment - The indicator matrix contains self similarity. The code seems to imply that self similarity values should be excluded. Certainly the mahout itemsimilarity doesn't return them.
        Hide
        pferrel Pat Ferrel added a comment -

        seems like the downsampleAndBinarize method is returning the wrong values. It is actually summing the values where it should be counting the non-zero elements?????

        // Downsample the interaction vector of each user
        for (userIndex <- 0 until keys.size) {

        val interactionsOfUser = block(userIndex, : // this is a Vector
        // if the values are non-boolean the sum will not be the number of interactions it will be a sum of strength-of-interaction, right?
        // val numInteractionsOfUser = interactionsOfUser.sum // doesn't this sum strength of interactions?
        val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements() // should do this I think

        val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser

        interactionsOfUser.nonZeroes().foreach { elem =>
        val numInteractionsWithThing = numInteractions(elem.index)
        val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing

        if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate))

        { // We ignore the original interaction value and create a binary 0-1 matrix // as we only consider whether interactions happened or did not happen downsampledBlock(userIndex, elem.index) = 1 }

        }

        Show
        pferrel Pat Ferrel added a comment - seems like the downsampleAndBinarize method is returning the wrong values. It is actually summing the values where it should be counting the non-zero elements????? // Downsample the interaction vector of each user for (userIndex <- 0 until keys.size) { val interactionsOfUser = block(userIndex, : // this is a Vector // if the values are non-boolean the sum will not be the number of interactions it will be a sum of strength-of-interaction, right? // val numInteractionsOfUser = interactionsOfUser.sum // doesn't this sum strength of interactions? val numInteractionsOfUser = interactionsOfUser.getNumNonZeroElements() // should do this I think val perUserSampleRate = math.min(maxNumInteractions, numInteractionsOfUser) / numInteractionsOfUser interactionsOfUser.nonZeroes().foreach { elem => val numInteractionsWithThing = numInteractions(elem.index) val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing if (random.nextDouble() <= math.min(perUserSampleRate, perThingSampleRate)) { // We ignore the original interaction value and create a binary 0-1 matrix // as we only consider whether interactions happened or did not happen downsampledBlock(userIndex, elem.index) = 1 } }
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45557670

        Pat, so, we are not going to use this for merging into merging, i take it? I will close it, you can keep working on your other requests.

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45557670 Pat, so, we are not going to use this for merging into merging, i take it? I will close it, you can keep working on your other requests.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45560733

        According to the instructions I merge from my branch anyway. I can close it right? The instruction for closing without merging?

        I assume you got my mail about finding the blocker now there are some questions about the cooccurrence algo itself.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45560733 According to the instructions I merge from my branch anyway. I can close it right? The instruction for closing without merging? I assume you got my mail about finding the blocker now there are some questions about the cooccurrence algo itself.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45565428

        you can close – but since i originated the PR, it is easier for me (I have
        access to the "close" button on it while everyone else would have to use
        "close apache/mahout#8" commit to do the same.)

        On Mon, Jun 9, 2014 at 5:20 PM, Pat Ferrel <notifications@github.com> wrote:

        > According to the instructions I merge from my branch anyway. I can close
        > it right? The instruction for closing without merging?
        >
        > I assume you got my mail about finding the blocker now there are some
        > questions about the cooccurrence algo itself.
        >
        > —
        > Reply to this email directly or view it on GitHub
        > <https://github.com/apache/mahout/pull/8#issuecomment-45560733>.
        >

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45565428 you can close – but since i originated the PR, it is easier for me (I have access to the "close" button on it while everyone else would have to use "close apache/mahout#8" commit to do the same.) On Mon, Jun 9, 2014 at 5:20 PM, Pat Ferrel <notifications@github.com> wrote: > According to the instructions I merge from my branch anyway. I can close > it right? The instruction for closing without merging? > > I assume you got my mail about finding the blocker now there are some > questions about the cooccurrence algo itself. > > — > Reply to this email directly or view it on GitHub > < https://github.com/apache/mahout/pull/8#issuecomment-45560733 >. >
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45626614

        Go ahead and hit the butto. Still have a bit more to do here.

        On Jun 9, 2014, at 6:47 PM, Dmitriy Lyubimov <notifications@github.com> wrote:

        you can close – but since i originated the PR, it is easier for me (I have
        access to the "close" button on it while everyone else would have to use
        "close apache/mahout#8" commit to do the same.)

        On Mon, Jun 9, 2014 at 5:20 PM, Pat Ferrel <notifications@github.com> wrote:

        > According to the instructions I merge from my branch anyway. I can close
        > it right? The instruction for closing without merging?
        >
        > I assume you got my mail about finding the blocker now there are some
        > questions about the cooccurrence algo itself.
        >
        > —
        > Reply to this email directly or view it on GitHub
        > <https://github.com/apache/mahout/pull/8#issuecomment-45560733>.
        >

        Reply to this email directly or view it on GitHub.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45626614 Go ahead and hit the butto. Still have a bit more to do here. On Jun 9, 2014, at 6:47 PM, Dmitriy Lyubimov <notifications@github.com> wrote: you can close – but since i originated the PR, it is easier for me (I have access to the "close" button on it while everyone else would have to use "close apache/mahout#8" commit to do the same.) On Mon, Jun 9, 2014 at 5:20 PM, Pat Ferrel <notifications@github.com> wrote: > According to the instructions I merge from my branch anyway. I can close > it right? The instruction for closing without merging? > > I assume you got my mail about finding the blocker now there are some > questions about the cooccurrence algo itself. > > — > Reply to this email directly or view it on GitHub > < https://github.com/apache/mahout/pull/8#issuecomment-45560733 >. > — Reply to this email directly or view it on GitHub.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov closed the pull request at:

        https://github.com/apache/mahout/pull/8

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov closed the pull request at: https://github.com/apache/mahout/pull/8
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on the pull request:

        https://github.com/apache/mahout/pull/8#issuecomment-45646072

        the cooccurrence analysis code should go to the math-scala not spark module, as it is independent of the underlying engine.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on the pull request: https://github.com/apache/mahout/pull/8#issuecomment-45646072 the cooccurrence analysis code should go to the math-scala not spark module, as it is independent of the underlying engine.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45646307

        i assume this is current PR for MAHOUT-1464?

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45646307 i assume this is current PR for MAHOUT-1464 ?
        Hide
        pferrel Pat Ferrel added a comment - - edited

        I think the same thing is happening with number of item interactions:

        // Broadcast vector containing the number of interactions with each thing
        val bcastNumInteractions = drmBroadcast(drmI.colSums)// sums?

        This broadcasts a vector of sums. We need a getNumNonZeroElements() for column vectors, actually a way to get a Vector of nonZero counts per column? We could get them from rows of the transposed matrix before doing the multiply of At %% A or B.t %% A in which case we’d get non-zero counts from the rows. Either way I don’t see a way to get a vector of these values without doing a mapBlock on the transposed matrix. Am I missing something?

        Currently the IndexedDataset is a very thin wrapper but I could add two vectors, which contain number of non-zero elements for rows and columns. In this case I would have it extend CheckpointedDrm perhaps. Since CheckpointedDrm extends DrmLike it could be used in the DSL algebra directly, in which case it would be simple to do the right thing with these vectors as well as the two id dictionaries for transpose and multiply but it’s a slippery slope.

        Before I go off in the wrong direction is there an existing way to get a vector of non-zero counts for rows or columns?

        Show
        pferrel Pat Ferrel added a comment - - edited I think the same thing is happening with number of item interactions: // Broadcast vector containing the number of interactions with each thing val bcastNumInteractions = drmBroadcast(drmI.colSums)// sums? This broadcasts a vector of sums. We need a getNumNonZeroElements() for column vectors, actually a way to get a Vector of nonZero counts per column? We could get them from rows of the transposed matrix before doing the multiply of At % % A or B.t % % A in which case we’d get non-zero counts from the rows. Either way I don’t see a way to get a vector of these values without doing a mapBlock on the transposed matrix. Am I missing something? Currently the IndexedDataset is a very thin wrapper but I could add two vectors, which contain number of non-zero elements for rows and columns. In this case I would have it extend CheckpointedDrm perhaps. Since CheckpointedDrm extends DrmLike it could be used in the DSL algebra directly, in which case it would be simple to do the right thing with these vectors as well as the two id dictionaries for transpose and multiply but it’s a slippery slope. Before I go off in the wrong direction is there an existing way to get a vector of non-zero counts for rows or columns?
        Hide
        tdunning Ted Dunning added a comment -

        I don't think that numNonZero can be trusted here. The contract it provides is to return an upper bound on the number of non-zeros, not a precise value.

        Better to write specific code.

        Show
        tdunning Ted Dunning added a comment - I don't think that numNonZero can be trusted here. The contract it provides is to return an upper bound on the number of non-zeros, not a precise value. Better to write specific code.
        Hide
        pferrel Pat Ferrel added a comment -

        OK, good to know. So the fix above for rows is not good either, oh bother.

        If I have to write specific code might it be better put in the Drm and/or Vector?

        Show
        pferrel Pat Ferrel added a comment - OK, good to know. So the fix above for rows is not good either, oh bother. If I have to write specific code might it be better put in the Drm and/or Vector?
        Hide
        tdunning Ted Dunning added a comment -

        Matrix and Vector already have something that can be used:

            Vector counts = x.aggregateColumns(new VectorFunction() {
              @Override
              public double apply(Vector f) {
                return f.aggregate(Functions.PLUS, Functions.greater(0));
              }
            });
        
        Show
        tdunning Ted Dunning added a comment - Matrix and Vector already have something that can be used: Vector counts = x.aggregateColumns( new VectorFunction() { @Override public double apply(Vector f) { return f.aggregate(Functions.PLUS, Functions.greater(0)); } });
        Hide
        pferrel Pat Ferrel added a comment -

        The algo transposes A (the primary) before self-coocurrence. That gives us a point to look at columns when they are rows, which in turn makes distributed ops on the drm simple. So rather than looking at the counts for columns, my earlier proposal was to look at the same data when it is a row. Might this be better since it can easily be a distributed calculation?

        In other words since A.t * A is calculated, we can split this into transpose and multiply taking column counts from the rows of A.t then doing the multiply after. In the list of calculations: A.t * A, B.t * A, ... each include a state where the columns turn into rows and so the same approach can be used.

        This introduces what was a bug as a significant optimization. If the data is already boolean, use the colSums then no distributed counting is needed.

        Not sure if the above is all true, so read it as a question

        Show
        pferrel Pat Ferrel added a comment - The algo transposes A (the primary) before self-coocurrence. That gives us a point to look at columns when they are rows, which in turn makes distributed ops on the drm simple. So rather than looking at the counts for columns, my earlier proposal was to look at the same data when it is a row. Might this be better since it can easily be a distributed calculation? In other words since A.t * A is calculated, we can split this into transpose and multiply taking column counts from the rows of A.t then doing the multiply after. In the list of calculations: A.t * A, B.t * A, ... each include a state where the columns turn into rows and so the same approach can be used. This introduces what was a bug as a significant optimization. If the data is already boolean, use the colSums then no distributed counting is needed. Not sure if the above is all true, so read it as a question
        Hide
        tdunning Ted Dunning added a comment -

        I don't understand the question.

        In fact, the transpose is never computed explicitly. There is a special operation that does A' A in a single pass and step. It is possible to fuse the down-sampling into this multiplication, but not possible to fuse the column counts. For large sparse A, the value of A'A is computed using a map-reduce style data flow where each row is examined and all cooccurrence counts are emitted to be grouped by row number later.

        In order to save memory, it is probably a good idea to discard the original counts as soon as they are reduced to binary form and down-sampled.

        For computing counts, it is possible to accumulate column sums in a row-wise accumulator at the same time that row sums are accumulated one element at a time. This avoids a pass over A and probably helps significantly on speed.

        Show
        tdunning Ted Dunning added a comment - I don't understand the question. In fact, the transpose is never computed explicitly. There is a special operation that does A' A in a single pass and step. It is possible to fuse the down-sampling into this multiplication, but not possible to fuse the column counts. For large sparse A, the value of A'A is computed using a map-reduce style data flow where each row is examined and all cooccurrence counts are emitted to be grouped by row number later. In order to save memory, it is probably a good idea to discard the original counts as soon as they are reduced to binary form and down-sampled. For computing counts, it is possible to accumulate column sums in a row-wise accumulator at the same time that row sums are accumulated one element at a time. This avoids a pass over A and probably helps significantly on speed.
        Hide
        ssc Sebastian Schelter added a comment -

        Hi,

        The computation of A'A is usually done without explicitly forming A'.
        Instead A'A is computed as the sum of outer products of rows of A.

        --sebastian

        Show
        ssc Sebastian Schelter added a comment - Hi, The computation of A'A is usually done without explicitly forming A'. Instead A'A is computed as the sum of outer products of rows of A. --sebastian
        Hide
        pferrel Pat Ferrel added a comment -

        Ok, learned something today.

        As to using the Java's x.aggregateColumns it looks like there are distributed Spark versions of colSums and the rest. They use Spark accumulators to avoid pulling the entire matrix into memory. I followed those models and created "colCounts" in MatrixOps and SparkEngine. Then used it instead of colSums.

        Cooccurrence now passes tests with non-boolean data.

        Scary adding to Dmitriy's code though so I'll invite him to look at it. Added a couple tests but I don't see many for SparkEngine.

        https://github.com/pferrel/mahout/compare/mahout-1464

        Still having problems getting mr-legacy to pass tests spark and match-scala pass tests.

        Show
        pferrel Pat Ferrel added a comment - Ok, learned something today. As to using the Java's x.aggregateColumns it looks like there are distributed Spark versions of colSums and the rest. They use Spark accumulators to avoid pulling the entire matrix into memory. I followed those models and created "colCounts" in MatrixOps and SparkEngine. Then used it instead of colSums. Cooccurrence now passes tests with non-boolean data. Scary adding to Dmitriy's code though so I'll invite him to look at it. Added a couple tests but I don't see many for SparkEngine. https://github.com/pferrel/mahout/compare/mahout-1464 Still having problems getting mr-legacy to pass tests spark and match-scala pass tests.
        Hide
        tdunning Ted Dunning added a comment -

        Should there be a dedicated colCounts function, or a more general accumulator?

        Basically, a row-by-row or column-by-column map-reduce aggregator is a common thing to need. This is different from the aggregateColumns we now have since what we have now doesn't requires access to the entire row.

        What I would be more interested in would be something like

              Vector r = v.aggregateByRows(DoubleDoubleFunction combine, DoubleFunction map)
        

        The virtue here is that iteration by rows is an efficient way to handle row-major arrangements, but iteration by column works as well:

              for (MatrixSlice row : m) {
                   for (int i = 0; i < columns; i++) {
                        r.setQuick(combine.apply(r.getQuick(i), map.apply(row.getQuick(i))));
                   }
              }
        

        or

             for (MatrixSlice col: m.columnIterator()) {
                 r.setQuick(col.index(), col.aggregate(combine, map));
            }
        

        These are approximate and we don't really have a columnIterator, but you can imagine how some kinds of matrix would have such a thing internally. You can also see how trivially these would be to parallelize. Arrangements which have row-wise patches of column-major data would also be easy to handle by combining these patterns.

        Show
        tdunning Ted Dunning added a comment - Should there be a dedicated colCounts function, or a more general accumulator? Basically, a row-by-row or column-by-column map-reduce aggregator is a common thing to need. This is different from the aggregateColumns we now have since what we have now doesn't requires access to the entire row. What I would be more interested in would be something like Vector r = v.aggregateByRows(DoubleDoubleFunction combine, DoubleFunction map) The virtue here is that iteration by rows is an efficient way to handle row-major arrangements, but iteration by column works as well: for (MatrixSlice row : m) { for ( int i = 0; i < columns; i++) { r.setQuick(combine.apply(r.getQuick(i), map.apply(row.getQuick(i)))); } } or for (MatrixSlice col: m.columnIterator()) { r.setQuick(col.index(), col.aggregate(combine, map)); } These are approximate and we don't really have a columnIterator, but you can imagine how some kinds of matrix would have such a thing internally. You can also see how trivially these would be to parallelize. Arrangements which have row-wise patches of column-major data would also be easy to handle by combining these patterns.
        Hide
        tdunning Ted Dunning added a comment -

        Scary adding to Dmitriy's code though so I'll invite him to look at it. Added a couple tests but I don't see many for SparkEngine.

        We don't have author tags. This is our code now and we all have to feel a bit of ownership and entitlement. Go for it.

        Show
        tdunning Ted Dunning added a comment - Scary adding to Dmitriy's code though so I'll invite him to look at it. Added a couple tests but I don't see many for SparkEngine. We don't have author tags. This is our code now and we all have to feel a bit of ownership and entitlement. Go for it.
        Hide
        pferrel Pat Ferrel added a comment -

        Check the code and let me know if there are problems. It uses a spark accumulator Vector keeping track of the non-zero column counts. Accumulators seem like a nice simplification.

        point #2: Still I can only read about 50% of D's code and can only keep in my brain about 10% of the overlapping traits, and classes. Not concerned with authorship, just correctness.

        Show
        pferrel Pat Ferrel added a comment - Check the code and let me know if there are problems. It uses a spark accumulator Vector keeping track of the non-zero column counts. Accumulators seem like a nice simplification. point #2: Still I can only read about 50% of D's code and can only keep in my brain about 10% of the overlapping traits, and classes. Not concerned with authorship, just correctness.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        put comments on PR.

        BTW In order for PR comments to mirror in JIRA, you need to use MAHOUT-1464 phrase in PR name, not Mahout 146e or Mahout-1646 (that's the way ASF bot apparently works)

        Show
        dlyubimov Dmitriy Lyubimov added a comment - put comments on PR. BTW In order for PR comments to mirror in JIRA, you need to use MAHOUT-1464 phrase in PR name, not Mahout 146e or Mahout-1646 (that's the way ASF bot apparently works)
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45912084

        Awaiting Sebastian's take on the naming of 'colCounts' to better fit R-Like Semantics

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45912084 Awaiting Sebastian's take on the naming of 'colCounts' to better fit R-Like Semantics
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tdunning commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45913357

        This discussion isn't getting echoed to the mailing list. I didn't even know it was happening.

        I think that a non-zero counter is nice, but it would be better to have a more general general aggregator of somethings. We have two instances already of this pattern and there will be more (sum of the abs values is common).

        Why not implement a general aggregator? THis is different from our current aggregateColumns because that function is not parallelizable.

        Something like def columnAggregator(combiner, mapper) is what I am aiming for. Positive counter would be m.columnAggregator(_ + _, _ > 0)

        Show
        githubbot ASF GitHub Bot added a comment - Github user tdunning commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45913357 This discussion isn't getting echoed to the mailing list. I didn't even know it was happening. I think that a non-zero counter is nice, but it would be better to have a more general general aggregator of somethings. We have two instances already of this pattern and there will be more (sum of the abs values is common). Why not implement a general aggregator? THis is different from our current aggregateColumns because that function is not parallelizable. Something like def columnAggregator(combiner, mapper) is what I am aiming for. Positive counter would be m.columnAggregator(_ + _, _ > 0)
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13711381

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,4 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }

        + private def vectorCountFunc = new VectorFunction

        { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + }

        +
        }
        — End diff –

        it looks like, to me. don't have time to look in depth. but distributed code definitely counts non-negatives with explicit inline conditional >0

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13711381 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,4 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } + private def vectorCountFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + } + } — End diff – it looks like, to me. don't have time to look in depth. but distributed code definitely counts non-negatives with explicit inline conditional >0
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13711414

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,4 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }

        + private def vectorCountFunc = new VectorFunction

        { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + }

        +
        }
        — End diff –

        it is very easy to tweak tests though to check if in doubt

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13711414 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,4 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } + private def vectorCountFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + } + } — End diff – it is very easy to tweak tests though to check if in doubt
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45915940

        fix header to say MAHOUT-1464, then hit close and reopen, it will restart the echo.

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45915940 fix header to say MAHOUT-1464 , then hit close and reopen, it will restart the echo.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45917234

        I already fixed the header.

        I agree with Ted, kinda what functional programming is for. The reason I didn't use the Java aggregate is because it wasn't distributed. Still probably beyond this ticket. I'll refactor if a Scala journeyman wants to provide a general mechanism. I'm still on training wheels.

        This still needs to be tested in a distributed Spark+HDFS environment and MAHOUT-1561 will make testing easy. I'd be happy to merge this and move on, which will have the side effect of testing larger datasets and clusters.

        If Someone wants to test this now on a Spark+HDFS cluster, please do!

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45917234 I already fixed the header. I agree with Ted, kinda what functional programming is for. The reason I didn't use the Java aggregate is because it wasn't distributed. Still probably beyond this ticket. I'll refactor if a Scala journeyman wants to provide a general mechanism. I'm still on training wheels. This still needs to be tested in a distributed Spark+HDFS environment and MAHOUT-1561 will make testing easy. I'd be happy to merge this and move on, which will have the side effect of testing larger datasets and clusters. If Someone wants to test this now on a Spark+HDFS cluster, please do!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45921381

        I think the name colCounts is misleading, we should stick to something like numNonZeroElementsPerColumn or so, not sure here.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45921381 I think the name colCounts is misleading, we should stick to something like numNonZeroElementsPerColumn or so, not sure here.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13713951

        — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala —
        @@ -0,0 +1,214 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.mahout.cf
        +
        +import org.apache.mahout.math._
        +import scalabindings._
        +import RLikeOps._
        +import drm._
        +import RLikeDrmOps._
        +import org.apache.mahout.sparkbindings._
        +import scala.collection.JavaConversions._
        +import org.apache.mahout.math.stats.LogLikelihood
        +import collection._
        +import org.apache.mahout.common.RandomUtils
        +import org.apache.mahout.math.function.

        {VectorFunction, Functions}

        +
        +
        +/**
        + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
        + * available at http://www.mapr.com/practical-machine-learning
        + *
        + * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
        + * Scalable Similarity-Based Neighborhood Methods with MapReduce
        + * ACM Conference on Recommender Systems 2012"
        + */
        +object CooccurrenceAnalysis extends Serializable {
        +
        + /** Compares (Int,Double) pairs by the second value */
        + private val orderByScore = Ordering.fromLessThan[(Int, Double)]

        { case ((_, score1), (_, score2)) => score1 > score2}

        +
        + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
        + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
        +
        + implicit val distributedContext = drmARaw.context
        +
        + // Apply selective downsampling, pin resulting matrix
        + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
        +
        + // num users, which equals the maximum number of interactions per item
        + val numUsers = drmA.nrow.toInt
        +
        + // Compute & broadcast the number of interactions per thing in A
        + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
        — End diff –

        drmA is already binary here, so we could use colSums

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13713951 — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala — @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function. {VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan [(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike [Int] , randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike [Int] ] = Array()): List[DrmLike [Int] ] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts) — End diff – drmA is already binary here, so we could use colSums
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13713968

        — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala —
        @@ -0,0 +1,214 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.mahout.cf
        +
        +import org.apache.mahout.math._
        +import scalabindings._
        +import RLikeOps._
        +import drm._
        +import RLikeDrmOps._
        +import org.apache.mahout.sparkbindings._
        +import scala.collection.JavaConversions._
        +import org.apache.mahout.math.stats.LogLikelihood
        +import collection._
        +import org.apache.mahout.common.RandomUtils
        +import org.apache.mahout.math.function.

        {VectorFunction, Functions}

        +
        +
        +/**
        + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
        + * available at http://www.mapr.com/practical-machine-learning
        + *
        + * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
        + * Scalable Similarity-Based Neighborhood Methods with MapReduce
        + * ACM Conference on Recommender Systems 2012"
        + */
        +object CooccurrenceAnalysis extends Serializable {
        +
        + /** Compares (Int,Double) pairs by the second value */
        + private val orderByScore = Ordering.fromLessThan[(Int, Double)]

        { case ((_, score1), (_, score2)) => score1 > score2}

        +
        + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
        + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
        +
        + implicit val distributedContext = drmARaw.context
        +
        + // Apply selective downsampling, pin resulting matrix
        + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
        +
        + // num users, which equals the maximum number of interactions per item
        + val numUsers = drmA.nrow.toInt
        +
        + // Compute & broadcast the number of interactions per thing in A
        + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
        +
        + // Compute co-occurrence matrix A'A
        + val drmAtA = drmA.t %*% drmA
        +
        + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
        + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
        + bcastInteractionsPerItemA, crossCooccurrence = false)
        +
        + var indicatorMatrices = List(drmIndicatorsAtA)
        +
        + // Now look at cross-co-occurrences
        + for (drmBRaw <- drmBs) {
        + // Down-sample and pin other interaction matrix
        + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
        +
        + // Compute & broadcast the number of interactions per thing in B
        + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts)
        — End diff –

        drmB is already binary here, so we could use colSums

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13713968 — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala — @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function. {VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan [(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike [Int] , randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike [Int] ] = Array()): List[DrmLike [Int] ] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts) — End diff – drmB is already binary here, so we could use colSums
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user sscdotopen commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13714024

        — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala —
        @@ -0,0 +1,214 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.mahout.cf
        +
        +import org.apache.mahout.math._
        +import scalabindings._
        +import RLikeOps._
        +import drm._
        +import RLikeDrmOps._
        +import org.apache.mahout.sparkbindings._
        +import scala.collection.JavaConversions._
        +import org.apache.mahout.math.stats.LogLikelihood
        +import collection._
        +import org.apache.mahout.common.RandomUtils
        +import org.apache.mahout.math.function.

        {VectorFunction, Functions}

        +
        +
        +/**
        + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
        + * available at http://www.mapr.com/practical-machine-learning
        + *
        + * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
        + * Scalable Similarity-Based Neighborhood Methods with MapReduce
        + * ACM Conference on Recommender Systems 2012"
        + */
        +object CooccurrenceAnalysis extends Serializable {
        +
        + /** Compares (Int,Double) pairs by the second value */
        + private val orderByScore = Ordering.fromLessThan[(Int, Double)]

        { case ((_, score1), (_, score2)) => score1 > score2}

        +
        + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
        + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
        +
        + implicit val distributedContext = drmARaw.context
        +
        + // Apply selective downsampling, pin resulting matrix
        + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
        +
        + // num users, which equals the maximum number of interactions per item
        + val numUsers = drmA.nrow.toInt
        +
        + // Compute & broadcast the number of interactions per thing in A
        + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
        +
        + // Compute co-occurrence matrix A'A
        + val drmAtA = drmA.t %*% drmA
        +
        + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
        + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
        + bcastInteractionsPerItemA, crossCooccurrence = false)
        +
        + var indicatorMatrices = List(drmIndicatorsAtA)
        +
        + // Now look at cross-co-occurrences
        + for (drmBRaw <- drmBs)

        { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + }

        +
        + // Unpin downsampled interaction matrix
        + drmA.uncache()
        +
        + // Return list of indicator matrices
        + indicatorMatrices
        + }
        +
        + /**
        + * Compute loglikelihood ratio
        + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
        + **/
        + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
        + numInteractionsWithAandB: Long, numInteractions: Long) =

        { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + }

        +
        + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
        + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
        + crossCooccurrence: Boolean = true) = {
        + drmBtA.mapBlock() {
        + case (keys, block) =>
        +
        + val llrBlock = block.like()
        + val numInteractionsB: Vector = bcastNumInteractionsB
        + val numInteractionsA: Vector = bcastNumInteractionsA
        +
        + for (index <- 0 until keys.size) {
        +
        + val thingB = keys(index)
        +
        + // PriorityQueue to select the top-k items
        + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
        +
        + block(index, :.nonZeroes().foreach { elem =>
        + val thingA = elem.index
        + val cooccurrences = elem.get
        +
        + // exclude co-occurrences of the item with itself
        + if (crossCooccurrence || thingB != thingA) {
        + // Compute loglikelihood ratio
        + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
        + cooccurrences.toLong, numUsers)
        + val candidate = thingA -> llrRatio
        +
        + // Enqueue item with score, if belonging to the top-k
        + if (topItemsPerThing.size < maxInterestingItemsPerThing)

        { + topItemsPerThing.enqueue(candidate) + }

        else if (orderByScore.lt(candidate, topItemsPerThing.head))

        { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + }

        + }
        + }
        +
        + // Add top-k interesting items to the output matrix
        + topItemsPerThing.dequeueAll.foreach

        { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + }

        + }
        +
        + keys -> llrBlock
        + }
        + }
        +
        + /**
        + * Selectively downsample users and things with an anomalous amount of interactions, inspired by
        + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
        + *
        + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
        + */
        + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
        +
        + implicit val distributedContext = drmM.context
        +
        + // Pin raw interaction matrix
        + val drmI = drmM.checkpoint()
        +
        + // Broadcast vector containing the number of interactions with each thing
        + val bcastNumInteractions = drmBroadcast(drmI.colCounts)
        +
        + val downSampledDrmI = drmI.mapBlock() {
        + case (keys, block) =>
        + val numInteractions: Vector = bcastNumInteractions
        +
        + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
        + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
        +
        + val downsampledBlock = block.like()
        +
        + // Downsample the interaction vector of each user
        + for (userIndex <- 0 until keys.size) {
        +
        + val interactionsOfUser = block(userIndex, :
        +
        + //todo: can we trust getNumNonZeroElements or is this the upper limit? May have to actually count them?
        — End diff –

        you can trust getNumNonZeroElements.

        Show
        githubbot ASF GitHub Bot added a comment - Github user sscdotopen commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13714024 — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala — @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function. {VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan [(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike [Int] , randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike [Int] ] = Array()): List[DrmLike [Int] ] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike [Int] , numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast [Vector] , bcastNumInteractionsA: BCast [Vector] , + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue [(Int, Double)] ()(orderByScore) + + block(index, : .nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike [Int] , seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.colCounts) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, : + + //todo: can we trust getNumNonZeroElements or is this the upper limit? May have to actually count them? — End diff – you can trust getNumNonZeroElements.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13714291

        — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala —
        @@ -0,0 +1,214 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.mahout.cf
        +
        +import org.apache.mahout.math._
        +import scalabindings._
        +import RLikeOps._
        +import drm._
        +import RLikeDrmOps._
        +import org.apache.mahout.sparkbindings._
        +import scala.collection.JavaConversions._
        +import org.apache.mahout.math.stats.LogLikelihood
        +import collection._
        +import org.apache.mahout.common.RandomUtils
        +import org.apache.mahout.math.function.

        {VectorFunction, Functions}

        +
        +
        +/**
        + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
        + * available at http://www.mapr.com/practical-machine-learning
        + *
        + * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
        + * Scalable Similarity-Based Neighborhood Methods with MapReduce
        + * ACM Conference on Recommender Systems 2012"
        + */
        +object CooccurrenceAnalysis extends Serializable {
        +
        + /** Compares (Int,Double) pairs by the second value */
        + private val orderByScore = Ordering.fromLessThan[(Int, Double)]

        { case ((_, score1), (_, score2)) => score1 > score2}

        +
        + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
        + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
        +
        + implicit val distributedContext = drmARaw.context
        +
        + // Apply selective downsampling, pin resulting matrix
        + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
        +
        + // num users, which equals the maximum number of interactions per item
        + val numUsers = drmA.nrow.toInt
        +
        + // Compute & broadcast the number of interactions per thing in A
        + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
        — End diff –

        colCounts or whatever we call it is just as efficient, is distributed and tells the reader what is the important value.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13714291 — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala — @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function. {VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan [(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike [Int] , randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike [Int] ] = Array()): List[DrmLike [Int] ] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts) — End diff – colCounts or whatever we call it is just as efficient, is distributed and tells the reader what is the important value.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/12#discussion_r13714354

        — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala —
        @@ -0,0 +1,214 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one or more
        + * contributor license agreements. See the NOTICE file distributed with
        + * this work for additional information regarding copyright ownership.
        + * The ASF licenses this file to You under the Apache License, Version 2.0
        + * (the "License"); you may not use this file except in compliance with
        + * the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.mahout.cf
        +
        +import org.apache.mahout.math._
        +import scalabindings._
        +import RLikeOps._
        +import drm._
        +import RLikeDrmOps._
        +import org.apache.mahout.sparkbindings._
        +import scala.collection.JavaConversions._
        +import org.apache.mahout.math.stats.LogLikelihood
        +import collection._
        +import org.apache.mahout.common.RandomUtils
        +import org.apache.mahout.math.function.

        {VectorFunction, Functions}

        +
        +
        +/**
        + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
        + * available at http://www.mapr.com/practical-machine-learning
        + *
        + * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
        + * Scalable Similarity-Based Neighborhood Methods with MapReduce
        + * ACM Conference on Recommender Systems 2012"
        + */
        +object CooccurrenceAnalysis extends Serializable {
        +
        + /** Compares (Int,Double) pairs by the second value */
        + private val orderByScore = Ordering.fromLessThan[(Int, Double)]

        { case ((_, score1), (_, score2)) => score1 > score2}

        +
        + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50,
        + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
        +
        + implicit val distributedContext = drmARaw.context
        +
        + // Apply selective downsampling, pin resulting matrix
        + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
        +
        + // num users, which equals the maximum number of interactions per item
        + val numUsers = drmA.nrow.toInt
        +
        + // Compute & broadcast the number of interactions per thing in A
        + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
        +
        + // Compute co-occurrence matrix A'A
        + val drmAtA = drmA.t %*% drmA
        +
        + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix
        + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA,
        + bcastInteractionsPerItemA, crossCooccurrence = false)
        +
        + var indicatorMatrices = List(drmIndicatorsAtA)
        +
        + // Now look at cross-co-occurrences
        + for (drmBRaw <- drmBs)

        { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + }

        +
        + // Unpin downsampled interaction matrix
        + drmA.uncache()
        +
        + // Return list of indicator matrices
        + indicatorMatrices
        + }
        +
        + /**
        + * Compute loglikelihood ratio
        + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details
        + **/
        + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
        + numInteractionsWithAandB: Long, numInteractions: Long) =

        { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + }

        +
        + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int,
        + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector],
        + crossCooccurrence: Boolean = true) = {
        + drmBtA.mapBlock() {
        + case (keys, block) =>
        +
        + val llrBlock = block.like()
        + val numInteractionsB: Vector = bcastNumInteractionsB
        + val numInteractionsA: Vector = bcastNumInteractionsA
        +
        + for (index <- 0 until keys.size) {
        +
        + val thingB = keys(index)
        +
        + // PriorityQueue to select the top-k items
        + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
        +
        + block(index, :.nonZeroes().foreach { elem =>
        + val thingA = elem.index
        + val cooccurrences = elem.get
        +
        + // exclude co-occurrences of the item with itself
        + if (crossCooccurrence || thingB != thingA) {
        + // Compute loglikelihood ratio
        + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
        + cooccurrences.toLong, numUsers)
        + val candidate = thingA -> llrRatio
        +
        + // Enqueue item with score, if belonging to the top-k
        + if (topItemsPerThing.size < maxInterestingItemsPerThing)

        { + topItemsPerThing.enqueue(candidate) + }

        else if (orderByScore.lt(candidate, topItemsPerThing.head))

        { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + }

        + }
        + }
        +
        + // Add top-k interesting items to the output matrix
        + topItemsPerThing.dequeueAll.foreach

        { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + }

        + }
        +
        + keys -> llrBlock
        + }
        + }
        +
        + /**
        + * Selectively downsample users and things with an anomalous amount of interactions, inspired by
        + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
        + *
        + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
        + */
        + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
        +
        + implicit val distributedContext = drmM.context
        +
        + // Pin raw interaction matrix
        + val drmI = drmM.checkpoint()
        +
        + // Broadcast vector containing the number of interactions with each thing
        + val bcastNumInteractions = drmBroadcast(drmI.colCounts)
        +
        + val downSampledDrmI = drmI.mapBlock() {
        + case (keys, block) =>
        + val numInteractions: Vector = bcastNumInteractions
        +
        + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures
        + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
        +
        + val downsampledBlock = block.like()
        +
        + // Downsample the interaction vector of each user
        + for (userIndex <- 0 until keys.size) {
        +
        + val interactionsOfUser = block(userIndex, :
        +
        + //todo: can we trust getNumNonZeroElements or is this the upper limit? May have to actually count them?
        — End diff –

        got it, I'll remove the comment since we do rely on it in the code

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13714354 — Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala — @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function. {VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan [(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike [Int] , randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike [Int] ] = Array()): List[DrmLike [Int] ] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike [Int] , numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast [Vector] , bcastNumInteractionsA: BCast [Vector] , + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue [(Int, Double)] ()(orderByScore) + + block(index, : .nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike [Int] , seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.colCounts) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, : + + //todo: can we trust getNumNonZeroElements or is this the upper limit? May have to actually count them? — End diff – got it, I'll remove the comment since we do rely on it in the code
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45923020

        numNonZeroElementsPerColumn? vs colSums?

        OK

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45923020 numNonZeroElementsPerColumn? vs colSums? OK
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tdunning commented on the pull request:

        https://github.com/apache/mahout/pull/12#issuecomment-45934491

        I hate abbreviations. If you are asking about naming, use the long name.

        If you can assure binary, then going with what we already have would be
        nice.

        On Thu, Jun 12, 2014 at 10:34 AM, Pat Ferrel <notifications@github.com>
        wrote:

        > numNonZeroElementsPerColumn? vs colSums?
        >
        > OK
        >
        > —
        > Reply to this email directly or view it on GitHub
        > <https://github.com/apache/mahout/pull/12#issuecomment-45923020>.
        >

        Show
        githubbot ASF GitHub Bot added a comment - Github user tdunning commented on the pull request: https://github.com/apache/mahout/pull/12#issuecomment-45934491 I hate abbreviations. If you are asking about naming, use the long name. If you can assure binary, then going with what we already have would be nice. On Thu, Jun 12, 2014 at 10:34 AM, Pat Ferrel <notifications@github.com> wrote: > numNonZeroElementsPerColumn? vs colSums? > > OK > > — > Reply to this email directly or view it on GitHub > < https://github.com/apache/mahout/pull/12#issuecomment-45923020 >. >
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/mahout/pull/12

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/mahout/pull/12
        Hide
        hudson Hudson added a comment -

        SUCCESS: Integrated in Mahout-Quality #2653 (See https://builds.apache.org/job/Mahout-Quality/2653/)
        MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12 (pat: rev c1ca30872c622e513e49fc1bb111bc4b8a527d3b)

        • spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
        • math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
        • spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala
        • spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
        • math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
        • math/src/main/java/org/apache/mahout/math/MurmurHash.java
        • math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
        • spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
        • spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
        • math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
        • CHANGELOG
        Show
        hudson Hudson added a comment - SUCCESS: Integrated in Mahout-Quality #2653 (See https://builds.apache.org/job/Mahout-Quality/2653/ ) MAHOUT-1464 Cooccurrence Analysis on Spark (pat) closes apache/mahout#12 (pat: rev c1ca30872c622e513e49fc1bb111bc4b8a527d3b) spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala math/src/main/java/org/apache/mahout/math/MurmurHash.java math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala CHANGELOG
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user pferrel opened a pull request:

        https://github.com/apache/mahout/pull/18

        MAHOUT-1464

        The numNonZeroElementsPerColumn additions did not account for negative values, only counted the positive non-zero values. Fixed this in the in core and distributed case.

        I added to Functions.java to create a Functions.notEqual. It may be possible to do this with the other functions but it wasn't obvious so I wrote one. The test is in MatrixOpsSuite, where is it used.

        The distributed case was much simpler.

        Changed tests to include negative values.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/pferrel/mahout mahout-1464

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/mahout/pull/18.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #18


        commit 107a0ba9605241653a85b113661a8fa5c055529f
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-04T19:54:22Z

        added Sebastian's CooccurrenceAnalysis patch updated it to use current Mahout-DSL

        commit 16c03f7fa73c156859d1dba3a333ef9e8bf922b0
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-04T21:32:18Z

        added Sebastian's MurmurHash changes

        Signed-off-by: pferrel <pat@occamsmachete.com>

        commit c6adaa44c80bba99d41600e260bbb1ad5c972e69
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-05T16:52:23Z

        MAHOUT-1464 import cleanup, minor changes to examples for running on Spark Cluster

        commit 1d66e5726e71e297ef4a7a27331463ba363098c0
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T20:19:32Z

        scalatest for cooccurrence cross and self along with other CooccurrenceAnalyisi methods

        commit 766db0f9e7feb70520fbd444afcb910788f01e76
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T20:20:46Z

        Merge branch 'master' into mahout-1464

        commit e492976688cb8860354bb20a362d370405f560e1
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T20:50:07Z

        cleaned up test comments

        commit a49692eb1664de4b15de1864b95701a6410c80c8
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-06T21:09:55Z

        got those cursed .DS_Stores out of the branch and put an exclude in .gitignore

        commit 268290d28d4f83cc47a7e6baebc5eb4c53d7c8da
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-07T21:50:04Z

        Merge branch 'master' into mahout-1464

        commit 63b10704390e18f513cca30596b1d25e146a6edd
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-08T15:26:36Z

        Merge branch 'master' into mahout-1464

        commit ac00d7655c4cba5f6c6dcb4882be95656b17a834
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-09T14:11:43Z

        Merge branch 'master' into mahout-1464

        commit fb008efeae3d5f6f6ba350fbc2ef3944da1dcaef
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T02:17:27Z

        added 'colCounts' to a drm using the SparkEngine and MatrixOps, which, when used in cooccurrence, fixes the problem with non-boolean preference values

        commit 5b04cb31403e2521d9874ad5e14f28cd0af26c26
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T02:18:29Z

        Merge branch 'master' into mahout-1464

        commit e451a2a596f5ceda8d1b4990e97ad3d5673fdb5f
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T16:02:26Z

        fixed some things from Dmitiy's comments, primary being the SparkEngine accumulator was doing >= 0 instead of > 0

        commit 411e0e92b4721626b736d66c292926fa4fdbb530
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T17:43:21Z

        changing the name of drm.colCounts to drm.getNumNonZeroElements

        commit 9655fd70f69ed97eb2d6765928a0a1f7dd760281
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T18:32:03Z

        meant to say changing drm.colCounts to drm.numNonZeroElementsPerColumn

        commit a2001375d46c5946b671f89f5a7cff2e6a094ea8
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T18:34:32Z

        Merge branch 'master' into mahout-1464

        commit 2db06b5566c8dcccb382733613b2fab6c223b5de
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T18:51:54Z

        typo

        commit 0b689b8b879c4ac03b71cf504a9d0d78ffa6bfa5
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T20:03:45Z

        clean up test

        commit 32afbe5e552ab94979dd545d14cda17ebc9c018e
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-12T23:42:08Z

        one more fat finger error

        commit b91e5e98c47829a5cc099289f83e99e6bf317dd6
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-13T16:18:33Z

        did not account for negative values in the purely mathematical MatrixOps and SparkEngine version of numNonZeroElementsPerColumn so fixed this and added to tests

        commit 9f6fd902f95c7daf687ecb59698f78217dbf6b6b
        Author: pferrel <pat@occamsmachete.com>
        Date: 2014-06-13T16:43:46Z

        merging master to run new tests


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user pferrel opened a pull request: https://github.com/apache/mahout/pull/18 MAHOUT-1464 The numNonZeroElementsPerColumn additions did not account for negative values, only counted the positive non-zero values. Fixed this in the in core and distributed case. I added to Functions.java to create a Functions.notEqual. It may be possible to do this with the other functions but it wasn't obvious so I wrote one. The test is in MatrixOpsSuite, where is it used. The distributed case was much simpler. Changed tests to include negative values. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pferrel/mahout mahout-1464 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/18.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18 commit 107a0ba9605241653a85b113661a8fa5c055529f Author: pferrel <pat@occamsmachete.com> Date: 2014-06-04T19:54:22Z added Sebastian's CooccurrenceAnalysis patch updated it to use current Mahout-DSL commit 16c03f7fa73c156859d1dba3a333ef9e8bf922b0 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-04T21:32:18Z added Sebastian's MurmurHash changes Signed-off-by: pferrel <pat@occamsmachete.com> commit c6adaa44c80bba99d41600e260bbb1ad5c972e69 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-05T16:52:23Z MAHOUT-1464 import cleanup, minor changes to examples for running on Spark Cluster commit 1d66e5726e71e297ef4a7a27331463ba363098c0 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T20:19:32Z scalatest for cooccurrence cross and self along with other CooccurrenceAnalyisi methods commit 766db0f9e7feb70520fbd444afcb910788f01e76 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T20:20:46Z Merge branch 'master' into mahout-1464 commit e492976688cb8860354bb20a362d370405f560e1 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T20:50:07Z cleaned up test comments commit a49692eb1664de4b15de1864b95701a6410c80c8 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-06T21:09:55Z got those cursed .DS_Stores out of the branch and put an exclude in .gitignore commit 268290d28d4f83cc47a7e6baebc5eb4c53d7c8da Author: pferrel <pat@occamsmachete.com> Date: 2014-06-07T21:50:04Z Merge branch 'master' into mahout-1464 commit 63b10704390e18f513cca30596b1d25e146a6edd Author: pferrel <pat@occamsmachete.com> Date: 2014-06-08T15:26:36Z Merge branch 'master' into mahout-1464 commit ac00d7655c4cba5f6c6dcb4882be95656b17a834 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-09T14:11:43Z Merge branch 'master' into mahout-1464 commit fb008efeae3d5f6f6ba350fbc2ef3944da1dcaef Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T02:17:27Z added 'colCounts' to a drm using the SparkEngine and MatrixOps, which, when used in cooccurrence, fixes the problem with non-boolean preference values commit 5b04cb31403e2521d9874ad5e14f28cd0af26c26 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T02:18:29Z Merge branch 'master' into mahout-1464 commit e451a2a596f5ceda8d1b4990e97ad3d5673fdb5f Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T16:02:26Z fixed some things from Dmitiy's comments, primary being the SparkEngine accumulator was doing >= 0 instead of > 0 commit 411e0e92b4721626b736d66c292926fa4fdbb530 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T17:43:21Z changing the name of drm.colCounts to drm.getNumNonZeroElements commit 9655fd70f69ed97eb2d6765928a0a1f7dd760281 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T18:32:03Z meant to say changing drm.colCounts to drm.numNonZeroElementsPerColumn commit a2001375d46c5946b671f89f5a7cff2e6a094ea8 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T18:34:32Z Merge branch 'master' into mahout-1464 commit 2db06b5566c8dcccb382733613b2fab6c223b5de Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T18:51:54Z typo commit 0b689b8b879c4ac03b71cf504a9d0d78ffa6bfa5 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T20:03:45Z clean up test commit 32afbe5e552ab94979dd545d14cda17ebc9c018e Author: pferrel <pat@occamsmachete.com> Date: 2014-06-12T23:42:08Z one more fat finger error commit b91e5e98c47829a5cc099289f83e99e6bf317dd6 Author: pferrel <pat@occamsmachete.com> Date: 2014-06-13T16:18:33Z did not account for negative values in the purely mathematical MatrixOps and SparkEngine version of numNonZeroElementsPerColumn so fixed this and added to tests commit 9f6fd902f95c7daf687ecb59698f78217dbf6b6b Author: pferrel <pat@occamsmachete.com> Date: 2014-06-13T16:43:46Z merging master to run new tests
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on the pull request:

        https://github.com/apache/mahout/pull/18#issuecomment-46035704

        Accounting for possible negative values in matrix of drm columns.

        drm case was a simple fix but in core Functions.java was modified to include a "notEqual(value)" function. There may be some other way to do this but it is a trivial function and now rather obvious.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on the pull request: https://github.com/apache/mahout/pull/18#issuecomment-46035704 Accounting for possible negative values in matrix of drm columns. drm case was a simple fix but in core Functions.java was modified to include a "notEqual(value)" function. There may be some other way to do this but it is a trivial function and now rather obvious.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on the pull request:

        https://github.com/apache/mahout/pull/18#issuecomment-46057614

        Bringing up a second PR from same branch. You really need just to rebase the changes over current master. This may not merge well.

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on the pull request: https://github.com/apache/mahout/pull/18#issuecomment-46057614 Bringing up a second PR from same branch. You really need just to rebase the changes over current master. This may not merge well.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13770844

        — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java —
        @@ -1393,6 +1393,17 @@ public double apply(double a) {
        };
        }

        + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */
        + public static DoubleFunction notEqual(final double b) {
        + return new DoubleFunction() {
        +
        + @Override
        + public double apply(double a) {
        + return a != b ? 1 : 0;
        — End diff –

        Since you are returning doubles, correct style is to say 1.0 or 0.0 not 1 or 0

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13770844 — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java — @@ -1393,6 +1393,17 @@ public double apply(double a) { }; } + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */ + public static DoubleFunction notEqual(final double b) { + return new DoubleFunction() { + + @Override + public double apply(double a) { + return a != b ? 1 : 0; — End diff – Since you are returning doubles, correct style is to say 1.0 or 0.0 not 1 or 0
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13770924

        — Diff: math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala —
        @@ -123,4 +123,16 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite {

        }

        + test("numNonZeroElementsPerColumn") {
        + val a = dense(
        + (2, 3, 4),
        + (3, 4, 5),
        + (-5, 0, -1),
        + (0, 0, 1)
        + )
        +
        + a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4))
        — End diff –

        style: spacing?

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13770924 — Diff: math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala — @@ -123,4 +123,16 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { } + test("numNonZeroElementsPerColumn") { + val a = dense( + (2, 3, 4), + (3, 4, 5), + (-5, 0, -1), + (0, 0, 1) + ) + + a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4)) — End diff – style: spacing?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13771023

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        Hm. isn't that is made obsolete with Sebastian's PR #17 ?

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13771023 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – Hm. isn't that is made obsolete with Sebastian's PR #17 ?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13771395

        — Diff: spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala —
        @@ -118,8 +118,8 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
        }

        test("cooccurrence [A'A], [B'A] integer data using LLR") {

        • val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
        • val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, 100), (10, 100, 0, 1000, 0))
          + val a = dense((1000, 10, 0, 0, 0), (0, 0, -10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
            • End diff –

        not sure if coocurrence test changes like that are necessary

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13771395 — Diff: spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala — @@ -118,8 +118,8 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc } test("cooccurrence [A'A] , [B'A] integer data using LLR") { val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0)) val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, 100), (10, 100, 0, 1000, 0)) + val a = dense((1000, 10, 0, 0, 0), (0, 0, -10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0)) End diff – not sure if coocurrence test changes like that are necessary
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13771632

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        This is creating a Vector of non-zero counts per column, just like colSums is summing the column's values. The function is simply needed in the aggregateColumns. If you are suggesting another way to do this you'll have to be more explicit.

        #17 has nothing to do with that afaict. It is about finding non-zero elements in a Vector.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13771632 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – This is creating a Vector of non-zero counts per column, just like colSums is summing the column's values. The function is simply needed in the aggregateColumns. If you are suggesting another way to do this you'll have to be more explicit. #17 has nothing to do with that afaict. It is about finding non-zero elements in a Vector.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13771652

        — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java —
        @@ -1393,6 +1393,17 @@ public double apply(double a) {
        };
        }

        + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */
        + public static DoubleFunction notEqual(final double b) {
        + return new DoubleFunction() {
        +
        + @Override
        + public double apply(double a) {
        + return a != b ? 1 : 0;
        — End diff –

        Taken from "equal" in the same file. Changed one character. But I'll note the point.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13771652 — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java — @@ -1393,6 +1393,17 @@ public double apply(double a) { }; } + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */ + public static DoubleFunction notEqual(final double b) { + return new DoubleFunction() { + + @Override + public double apply(double a) { + return a != b ? 1 : 0; — End diff – Taken from "equal" in the same file. Changed one character. But I'll note the point.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13771890

        — Diff: spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala —
        @@ -118,8 +118,8 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc
        }

        test("cooccurrence [A'A], [B'A] integer data using LLR") {

        • val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
        • val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, 100), (10, 100, 0, 1000, 0))
          + val a = dense((1000, 10, 0, 0, 0), (0, 0, -10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0))
            • End diff –

        We may want to check for illegal values at some place in the pipeline. This is here so I don't forget. At present a negative value is legal. If we make it illegal I want this to fail.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13771890 — Diff: spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala — @@ -118,8 +118,8 @@ class CooccurrenceAnalysisSuite extends FunSuite with MahoutSuite with MahoutLoc } test("cooccurrence [A'A] , [B'A] integer data using LLR") { val a = dense((1000, 10, 0, 0, 0), (0, 0, 10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0)) val b = dense((100, 1000, 10000, 10000, 0), (10000, 1000, 100, 10, 0), (0, 0, 10, 0, 100), (10, 100, 0, 1000, 0)) + val a = dense((1000, 10, 0, 0, 0), (0, 0, -10000, 10, 0), (0, 0, 0, 0, 100), (10000, 0, 0, 1000, 0)) End diff – We may want to check for illegal values at some place in the pipeline. This is here so I don't forget. At present a negative value is legal. If we make it illegal I want this to fail.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13772128

        — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java —
        @@ -1393,6 +1393,17 @@ public double apply(double a) {
        };
        }

        + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */
        + public static DoubleFunction notEqual(final double b) {
        + return new DoubleFunction() {
        +
        + @Override
        + public double apply(double a) {
        + return a != b ? 1 : 0;
        — End diff –

        just relaying some historical discussion in Mahout. It was known to create a bug in my own Mahout commit once. Which had sparked the discussion about constant formatting.

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13772128 — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java — @@ -1393,6 +1393,17 @@ public double apply(double a) { }; } + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */ + public static DoubleFunction notEqual(final double b) { + return new DoubleFunction() { + + @Override + public double apply(double a) { + return a != b ? 1 : 0; — End diff – just relaying some historical discussion in Mahout. It was known to create a bug in my own Mahout commit once. Which had sparked the discussion about constant formatting.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user dlyubimov commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13775208

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        i mean, shouldn't this specialized version be more effective than an aggregate:

        def apply(f: Vector): Double = f.getnumNonZeroElements().toDouble

        Show
        githubbot ASF GitHub Bot added a comment - Github user dlyubimov commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13775208 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – i mean, shouldn't this specialized version be more effective than an aggregate: def apply(f: Vector): Double = f.getnumNonZeroElements().toDouble
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13781567

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        Nice. I didn't look deep enough to see that f is the column vector. I'll change that.

        While we are at it, I now know about A'A (which is the slim calc?) that doesn't really compute A'. If you do similar for two different matrices:``` B.t %*% A``` does B.t ever get checkpointed?

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13781567 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – Nice. I didn't look deep enough to see that f is the column vector. I'll change that. While we are at it, I now know about A'A (which is the slim calc?) that doesn't really compute A'. If you do similar for two different matrices:``` B.t %*% A``` does B.t ever get checkpointed?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13781773

        — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java —
        @@ -1393,6 +1393,17 @@ public double apply(double a) {
        };
        }

        + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */
        + public static DoubleFunction notEqual(final double b) {
        + return new DoubleFunction() {
        +
        + @Override
        + public double apply(double a) {
        + return a != b ? 1 : 0;
        — End diff –

        Whenever I modify a mature file that someone else has created, my general rule is to stay with the style of the collective authors. Here I agree that the 1.0, 0.0 is better I'm hesitant to change it here when 1, and 0 are used throughout the file and I don't want to change it everywhere. There is probably more chance of me messing something up accidentally than actually fixing something if I change the whole file. If this seem wrong let me know but in past jobs we did this to avoid constant thrash over minor style disagreements.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13781773 — Diff: math/src/main/java/org/apache/mahout/math/function/Functions.java — @@ -1393,6 +1393,17 @@ public double apply(double a) { }; } + /** Constructs a function that returns <tt>a != b ? 1 : 0</tt>. <tt>a</tt> is a variable, <tt>b</tt> is fixed. */ + public static DoubleFunction notEqual(final double b) { + return new DoubleFunction() { + + @Override + public double apply(double a) { + return a != b ? 1 : 0; — End diff – Whenever I modify a mature file that someone else has created, my general rule is to stay with the style of the collective authors. Here I agree that the 1.0, 0.0 is better I'm hesitant to change it here when 1, and 0 are used throughout the file and I don't want to change it everywhere. There is probably more chance of me messing something up accidentally than actually fixing something if I change the whole file. If this seem wrong let me know but in past jobs we did this to avoid constant thrash over minor style disagreements.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/mahout/pull/18

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/mahout/pull/18
        Hide
        hudson Hudson added a comment -

        FAILURE: Integrated in Mahout-Quality #2655 (See https://builds.apache.org/job/Mahout-Quality/2655/)
        MAHOUT-1464 fixed bug counting only positive column elements, now counts all non-zero (pat) closes apache/mahout#18 (pat: rev c20eee89c6cc669494cf7edbb80255a83e194a15)

        • math/src/main/java/org/apache/mahout/math/function/Functions.java
        • spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala
        • math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
        • spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
        • spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala
        • math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
        Show
        hudson Hudson added a comment - FAILURE: Integrated in Mahout-Quality #2655 (See https://builds.apache.org/job/Mahout-Quality/2655/ ) MAHOUT-1464 fixed bug counting only positive column elements, now counts all non-zero (pat) closes apache/mahout#18 (pat: rev c20eee89c6cc669494cf7edbb80255a83e194a15) math/src/main/java/org/apache/mahout/math/function/Functions.java spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala spark/src/test/scala/org/apache/mahout/cf/CooccurrenceAnalysisSuite.scala math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tdunning commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13782027

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        Using the vector aggregation framework will be very inefficient here. We should either use Seb's suggestion or add properly scalable aggregation that doesn't depend on getting a vector view of a column.

        Show
        githubbot ASF GitHub Bot added a comment - Github user tdunning commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13782027 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – Using the vector aggregation framework will be very inefficient here. We should either use Seb's suggestion or add properly scalable aggregation that doesn't depend on getting a vector view of a column.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user pferrel commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13782871

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        used Sebastian's getNumNonZeroElements here. Is your issue with Dmitriy's suggestion? This is only for in core matrices, the code used for drms is the stuff in SparkEngine, which accumulates using the nonZero iterator on row Vectors.

        Show
        githubbot ASF GitHub Bot added a comment - Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13782871 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – used Sebastian's getNumNonZeroElements here. Is your issue with Dmitriy's suggestion? This is only for in core matrices, the code used for drms is the stuff in SparkEngine, which accumulates using the nonZero iterator on row Vectors.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tdunning commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/18#discussion_r13783816

        — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala —
        @@ -188,8 +188,8 @@ object MatrixOps

        { def apply(f: Vector): Double = f.sum }
        • private def vectorCountFunc = new VectorFunction {
        • def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0))
          + private def vectorCountNonZeroElementsFunc = new VectorFunction {
          + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0))
            • End diff –

        The issue I have is with the rowAggregation and columnAggregation API. It enforces row by row evaluation. A map-reduce API could evaluate in many different orders and could iterate by rows or by columns for either aggregation and wouldn't require the a custom VectorFunction for simple aggregations.

        Show
        githubbot ASF GitHub Bot added a comment - Github user tdunning commented on a diff in the pull request: https://github.com/apache/mahout/pull/18#discussion_r13783816 — Diff: math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala — @@ -188,8 +188,8 @@ object MatrixOps { def apply(f: Vector): Double = f.sum } private def vectorCountFunc = new VectorFunction { def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.greater(0)) + private def vectorCountNonZeroElementsFunc = new VectorFunction { + def apply(f: Vector): Double = f.aggregate(Functions.PLUS, Functions.notEqual(0)) End diff – The issue I have is with the rowAggregation and columnAggregation API. It enforces row by row evaluation. A map-reduce API could evaluate in many different orders and could iterate by rows or by columns for either aggregation and wouldn't require the a custom VectorFunction for simple aggregations.
        Hide
        pferrel Pat Ferrel added a comment -

        finally got a Spark cluster working and this does run on it. Also using HDFS for I/O.

        I think there is more to do here but will treat as enhancements and close now.

        Show
        pferrel Pat Ferrel added a comment - finally got a Spark cluster working and this does run on it. Also using HDFS for I/O. I think there is more to do here but will treat as enhancements and close now.
        Hide
        pferrel Pat Ferrel added a comment -

        WFM on HDFS 1.2.1 Spark compiled to match.

        The problem that made this untestable on HDFS seems to be that the read code was broadcasting something that Kryo did not know how to handle. Fixed this in the driver/ I/O code and this now works.

        Show
        pferrel Pat Ferrel added a comment - WFM on HDFS 1.2.1 Spark compiled to match. The problem that made this untestable on HDFS seems to be that the read code was broadcasting something that Kryo did not know how to handle. Fixed this in the driver/ I/O code and this now works.
        Hide
        pferrel Pat Ferrel added a comment -

        Argh, I finally got to looking at cross-cooccurrence with matrices of different dimensions. This made a rather easy to fix but nasty error obvious.

        If A is the primary self-similairty matrix we want to do A'A with, and B is the secondary matrix, we want to do A'B with it since for use in a recommender we want rows of the cooccurrence to be IDed by the columns of A (items in A), right?

        The code does:

              // Compute cross-co-occurrence matrix B'A
              val drmBtA = drmB.t %*% drmA
        

        it should do:

              // Compute cross-co-occurrence matrix A'B
              val drmAtB = drmA.t %*% drmB
        

        Ted, can you confirm this? It's the way I've got it burned into my brain and applying ID translation with completely different IDs in B seems to confirm it.

        Show
        pferrel Pat Ferrel added a comment - Argh, I finally got to looking at cross-cooccurrence with matrices of different dimensions. This made a rather easy to fix but nasty error obvious. If A is the primary self-similairty matrix we want to do A'A with, and B is the secondary matrix, we want to do A'B with it since for use in a recommender we want rows of the cooccurrence to be IDed by the columns of A (items in A), right? The code does: // Compute cross-co-occurrence matrix B'A val drmBtA = drmB.t %*% drmA it should do: // Compute cross-co-occurrence matrix A'B val drmAtB = drmA.t %*% drmB Ted, can you confirm this? It's the way I've got it burned into my brain and applying ID translation with completely different IDs in B seems to confirm it.
        Hide
        tdunning Ted Dunning added a comment -

        If A is the primary self-similarity matrix we want to do A'A with, and B is the secondary matrix, we want to do A'B with it since for use in a recommender we want rows of the cooccurrence to be IDed by the columns of A (items in A), right?

        Yes. I think I understand the question.

        A'B should have row id's which are column id's of A. It should have column id's which are the column id's of B.

        Show
        tdunning Ted Dunning added a comment - If A is the primary self-similarity matrix we want to do A'A with, and B is the secondary matrix, we want to do A'B with it since for use in a recommender we want rows of the cooccurrence to be IDed by the columns of A (items in A), right? Yes. I think I understand the question. A'B should have row id's which are column id's of A. It should have column id's which are the column id's of B.
        Hide
        Andrew_Palumbo Andrew Palumbo added a comment -

        We can close this, right?

        Show
        Andrew_Palumbo Andrew Palumbo added a comment - We can close this, right?
        Hide
        pferrel Pat Ferrel added a comment -

        SimilarityAnalysis has cooccurrence for columns, cross-cooccurrence too and row similarity with LLR filtering. No other "similarity" measure is implemented. Something like Cosine will require a different approach than A'A of AA' since it requires vector dot products while LLR only looks at occurrence counts--maybe something like the legacy mapreduce code. Deferring other similarity measures.

        Show
        pferrel Pat Ferrel added a comment - SimilarityAnalysis has cooccurrence for columns, cross-cooccurrence too and row similarity with LLR filtering. No other "similarity" measure is implemented. Something like Cosine will require a different approach than A'A of AA' since it requires vector dot products while LLR only looks at occurrence counts--maybe something like the legacy mapreduce code. Deferring other similarity measures.
        Hide
        sslavic Stevo Slavic added a comment -

        Bulk closing all 0.10.0 resolved issues

        Show
        sslavic Stevo Slavic added a comment - Bulk closing all 0.10.0 resolved issues

          People

          • Assignee:
            pferrel Pat Ferrel
            Reporter:
            pferrel Pat Ferrel
          • Votes:
            0 Vote for this issue
            Watchers:
            12 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development