Uploaded image for project: 'Mahout'
  1. Mahout
  2. MAHOUT-1820

Add a method to generate Tuple<PartitionId, Partition elements count>> to support Flink backend

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: 0.11.2
    • Fix Version/s: 0.12.0
    • Component/s: Flink
    • Labels:
      None

      Description

      Add a method - countElementsPerPartition() that returns a Tuple2<PartitionID, PartitionCount>, this is a temporary fix until the PR for Flink-3657 is merged into Flink Codebase.

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user smarthi opened a pull request:

        https://github.com/apache/mahout/pull/207

        MAHOUT-1820: Add a method to generate Tuple<PartitionId, Partition el…

        …ements count>> to support Flink backend

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/smarthi/mahout MAHOUT-1742

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/mahout/pull/207.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #207


        commit f81a6ff4a44f39f6a21048a17851479ae8da0d50
        Author: smarthi <smarthi@apache.org>
        Date: 2016-03-29T22:41:23Z

        MAHOUT-1820: Add a method to generate Tuple<PartitionId, Partition elements count>> to support Flink backend


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user smarthi opened a pull request: https://github.com/apache/mahout/pull/207 MAHOUT-1820 : Add a method to generate Tuple<PartitionId, Partition el… …ements count>> to support Flink backend You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/mahout MAHOUT-1742 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #207 commit f81a6ff4a44f39f6a21048a17851479ae8da0d50 Author: smarthi <smarthi@apache.org> Date: 2016-03-29T22:41:23Z MAHOUT-1820 : Add a method to generate Tuple<PartitionId, Partition elements count>> to support Flink backend
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/207#discussion_r57815021

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala —
        @@ -0,0 +1,51 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing,
        + * software distributed under the License is distributed on an
        + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
        + * KIND, either express or implied. See the License for the
        + * specific language governing permissions and limitations
        + * under the License.
        + */
        +package org.apache.mahout.flinkbindings
        +
        +import java.lang.Iterable
        +
        +import org.apache.flink.api.common.functions.RichMapPartitionFunction
        +import org.apache.flink.api.scala._
        +import org.apache.flink.util.Collector
        +
        +import scala.collection._
        +
        +package object blas {
        +
        + /**
        + * To compute tuples (PartitionIndex, PartitionElementCount)
        + *
        + * @param drmDataSet
        + * @tparam K
        + * @return (PartitionIndex, PartitionElementCount)
        + */
        + //TODO: Remove this when FLINK-3657 is merged into Flink codebase and
        + // replace by call to DataSetUtils.countElementsPerPartition(DataSet[K])
        + private[mahout] def countsPerPartition[K](drmDataSet: DataSet[K]): DataSet[(Int, Int)] = {
        + drmDataSet.mapPartition {
        + new RichMapPartitionFunction[K, (Int, Int)] {
        + override def mapPartition(iterable: Iterable[K], collector: Collector[(Int, Int)]) = {
        + val count: Int = Iterator(iterable).size
        — End diff –

        +1 to this.

        I'm not sure if it matters but you could use iterable.toIterator.size.

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/207#discussion_r57815021 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala — @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.mahout.flinkbindings + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichMapPartitionFunction +import org.apache.flink.api.scala._ +import org.apache.flink.util.Collector + +import scala.collection._ + +package object blas { + + /** + * To compute tuples (PartitionIndex, PartitionElementCount) + * + * @param drmDataSet + * @tparam K + * @return (PartitionIndex, PartitionElementCount) + */ + //TODO: Remove this when FLINK-3657 is merged into Flink codebase and + // replace by call to DataSetUtils.countElementsPerPartition(DataSet [K] ) + private [mahout] def countsPerPartition [K] (drmDataSet: DataSet [K] ): DataSet [(Int, Int)] = { + drmDataSet.mapPartition { + new RichMapPartitionFunction [K, (Int, Int)] { + override def mapPartition(iterable: Iterable [K] , collector: Collector [(Int, Int)] ) = { + val count: Int = Iterator(iterable).size — End diff – +1 to this. I'm not sure if it matters but you could use iterable.toIterator.size.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi commented on the pull request:

        https://github.com/apache/mahout/pull/207#issuecomment-203156203

        Committed to branch flink-binding

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi commented on the pull request: https://github.com/apache/mahout/pull/207#issuecomment-203156203 Committed to branch flink-binding
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi closed the pull request at:

        https://github.com/apache/mahout/pull/207

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi closed the pull request at: https://github.com/apache/mahout/pull/207
        Hide
        hudson Hudson added a comment -

        FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/)
        MAHOUT-1820:Add a method to generate Tuple<PartitionId, Partition (smarthi: rev 5863bbf1587cabef68f470fc8cd4812a2c1f8f79)

        • flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
        Show
        hudson Hudson added a comment - FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/ ) MAHOUT-1820 :Add a method to generate Tuple<PartitionId, Partition (smarthi: rev 5863bbf1587cabef68f470fc8cd4812a2c1f8f79) flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
        Hide
        smarthi Suneel Marthi added a comment -

        Closing issues following Mahout 0.12.0 release on April 11, 2016

        Show
        smarthi Suneel Marthi added a comment - Closing issues following Mahout 0.12.0 release on April 11, 2016

          People

          • Assignee:
            smarthi Suneel Marthi
            Reporter:
            smarthi Suneel Marthi
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development