Uploaded image for project: 'Mahout'
  1. Mahout
  2. MAHOUT-1819

Set the default Parallelism for Flink execution in FlinkDistributedContext

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: 0.11.2
    • Fix Version/s: 0.12.0
    • Component/s: Flink
    • Labels:
      None

      Description

      Remove the option to set the degree of parallelism at individual operators and set it once in FlinkDistributedContext.

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user smarthi opened a pull request:

        https://github.com/apache/mahout/pull/206

        MAHOUT-1819: Set the default Parallelism for Flink execution in FlinkDistributedContext

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/smarthi/mahout MAHOUT-1819

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/mahout/pull/206.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #206


        commit 6bd4a87ca253450fb5e7fa9349c7dc1de3ce6f0b
        Author: smarthi <smarthi@apache.org>
        Date: 2016-03-28T20:12:14Z

        MAHOUT-1819: Set the default Parallelism for Flink execution in FlinkDistributedContext


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user smarthi opened a pull request: https://github.com/apache/mahout/pull/206 MAHOUT-1819 : Set the default Parallelism for Flink execution in FlinkDistributedContext You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/mahout MAHOUT-1819 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/206.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #206 commit 6bd4a87ca253450fb5e7fa9349c7dc1de3ce6f0b Author: smarthi <smarthi@apache.org> Date: 2016-03-28T20:12:14Z MAHOUT-1819 : Set the default Parallelism for Flink execution in FlinkDistributedContext
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/206#discussion_r57629267

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala —
        @@ -249,12 +249,10 @@ object FlinkEngine extends DistributedEngine {
        override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
        FlinkByteBCast.wrap(v)

        -
        /** Broadcast support */
        override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
        FlinkByteBCast.wrap(m)

        -
        /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
        override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
        — End diff –

        we should probably note in the comments that this breaks the drmParallelize() contract, and ignores the numPartitions parameter.

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/206#discussion_r57629267 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala — @@ -249,12 +249,10 @@ object FlinkEngine extends DistributedEngine { override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast [Vector] = FlinkByteBCast.wrap(v) - /** Broadcast support */ override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast [Matrix] = FlinkByteBCast.wrap(m) - /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */ override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) — End diff – we should probably note in the comments that this breaks the drmParallelize() contract, and ignores the numPartitions parameter.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on the pull request:

        https://github.com/apache/mahout/pull/206#issuecomment-202571752

        +1

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/206#issuecomment-202571752 +1
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/206#discussion_r57630038

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala —
        @@ -249,12 +249,10 @@ object FlinkEngine extends DistributedEngine {
        override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] =
        FlinkByteBCast.wrap(v)

        -
        /** Broadcast support */
        override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] =
        FlinkByteBCast.wrap(m)

        -
        /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */
        override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
        — End diff –

        yeah will do. the only reason we still have numPartitions as a parameter is because this method in its present form is more useful to Spark and H2O, but not in Flink.

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/mahout/pull/206#discussion_r57630038 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala — @@ -249,12 +249,10 @@ object FlinkEngine extends DistributedEngine { override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast [Vector] = FlinkByteBCast.wrap(v) - /** Broadcast support */ override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast [Matrix] = FlinkByteBCast.wrap(m) - /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */ override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) — End diff – yeah will do. the only reason we still have numPartitions as a parameter is because this method in its present form is more useful to Spark and H2O, but not in Flink.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi closed the pull request at:

        https://github.com/apache/mahout/pull/206

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi closed the pull request at: https://github.com/apache/mahout/pull/206
        Hide
        hudson Hudson added a comment -

        FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/)
        MAHOUT-1819:Set the default Parallelism for Flink execution in (smarthi: rev f0e22e28c8da53a0c26233afd4d363dcc035ee7d)

        • flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
        • flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
        • flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
        • flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
        Show
        hudson Hudson added a comment - FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/ ) MAHOUT-1819 :Set the default Parallelism for Flink execution in (smarthi: rev f0e22e28c8da53a0c26233afd4d363dcc035ee7d) flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
        Hide
        smarthi Suneel Marthi added a comment -

        Closing issues following Mahout 0.12.0 release on April 11, 2016

        Show
        smarthi Suneel Marthi added a comment - Closing issues following Mahout 0.12.0 release on April 11, 2016

          People

          • Assignee:
            smarthi Suneel Marthi
            Reporter:
            smarthi Suneel Marthi
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development