Details

    • Type: Task
    • Status: Resolved
    • Priority: Minor
    • Resolution: Won't Fix
    • Affects Version/s: 0.10.2
    • Fix Version/s: 0.13.0
    • Component/s: Flink, Math
    • Labels:
      None

      Description

      Now ABt is expressed through AtB, which is not optimal, and we need to have a special implementation for ABt

        Issue Links

          Activity

          Hide
          Andrew_Palumbo Andrew Palumbo added a comment - - edited
          Show
          Andrew_Palumbo Andrew Palumbo added a comment - - edited tracking the problems with what should be a rather straightforward implementation here: https://gist.github.com/andrewpalumbo/c42d41074410752a8712446dcd1f86dc Tracking the development here: https://github.com/andrewpalumbo/mahout/blob/MAHOUT-1750/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          Thanks @till for the help!

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - Thanks @till for the help!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user andrewpalumbo opened a pull request:

          https://github.com/apache/mahout/pull/215

          MAHOUT-1750 - For Review. FlinkOpABt too many operations to implement?

          When (partially) Implementing `FlinkOpABt` so far, It seems that there may be too many Flink map/reduce/group/etc operations to finish using this method. *NOTE*: this is unfinished and is not numerically correct.

          Currently, when testing am getting kyro stack overflow exceptions, which as I understand are often caused by a long string of operations:
          ```
          Job execution failed.
          org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716)
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
          at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
          at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
          Caused by: java.lang.StackOverflowError
          at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:74)
          at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
          at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
          at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
          at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)

          {...}

          ```

          Any comments are appreciated.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1750

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/mahout/pull/215.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #215


          commit fab6bb86fa1af0a7efce2300dcada45c32b35677
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-28T01:34:34Z

          start Stealing DL's Spark code

          commit 7e1a8bbaaca71036e9c0247f07a40368432f592b
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-28T01:55:01Z

          wip: Spark combineByKey => Flink groupReduce

          commit 31a644b36ed92c5aae3b80d5e8fbca7dd8a1d87a
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-28T22:38:31Z

          implement first step of the combiner

          commit 7dc73890ac5eeaad1ba38ab799b19b989c6ff918
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-28T22:53:09Z

          Add in type information for maps, etc.

          commit 5c60167f92a32b4c0751f6bf8dc12a838e1cde78
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-28T23:50:12Z

          Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750

          commit 9b01a4a498c080e6063387608253890a13e77ad4
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-29T02:48:43Z

          remove all unnecessary type info still not builing with GroupCombineFunctions

          commit 37391a5a51b6803fdf893349a45a61903a6928e5
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-30T00:07:26Z

          wip

          commit d65e23b91f3c055c80de432dc9699c62512dd34e
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-30T00:07:40Z

          Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750

          commit 3127d3218d9fa0fba35c9c240db63e71b00c8a32
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-03-30T01:01:15Z

          wip: still problems with error: object creation impossible, since method mapPartition in class RichMapPartitionFunction of type (x: Iterable[(Array[K1], org.apache.mahout.math.Matrix)], x: org.apache.flink.util.Collector[(Int, Array[K1], org.apache.mahout.math.Matrix)])Unit is not defined

          commit c19493b058e6d92f10fadeb32be9bafb14e7c671
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T05:17:03Z

          WIP

          commit b9ed381c2178392e785c4835adfac53b8d0ec5fb
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T08:41:58Z

          Revert "WIP"

          This reverts commit c19493b058e6d92f10fadeb32be9bafb14e7c671.

          commit 3311e89fbef3751476dbf113f680330b61f64160
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T08:51:20Z

          Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750

          commit db21e7b12a0b4e05ff27fa9031fd8bf3e5f22bda
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T17:02:05Z

          Use java.lang.Iterable instead of scala.Iterable- this fixes the object creation issue.

          commit d3d60fadf50ceb12f559d4c94c9402735c8a7d30
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T21:45:03Z

          wip

          commit 0a83e122c82eb0584a00510f7fb571432e834064
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T22:11:26Z

          wip: error is

          Error:(180, 23) overriding method reduce in trait ReduceFunction of type (x: (Array[K], org.apache.mahout.math.Matrix), x: (Array[K], org.apache.mahout.math.Matrix))(Array[K], org.apache.mahout.math.Matrix);
          method reduce has incompatible type
          def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)) {
          ^

          commit 97a6aea7c4fd0abc719605991322ac4acb758ea8
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-08T22:20:56Z

          wip: compiling now

          commit 088d925ba3cc5e93e25ad37f8e2fcf4d4698595f
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-09T00:44:22Z

          not returning any partitions. failing at dimensionality count in CheckpointedDrmFlink

          commit 82ae321b85b84e478152f208059f9c8146d814c8
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-09T01:01:05Z

          wip: need to make combiner into a merger

          commit 3c7c2ff937206b0882f6395058a8646ed1c0cdaa
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-09T14:29:38Z

          wip: comments, combiner

          commit cd2b59ac96b63c93ae18619cdf22b6c2742e93d3
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-09T19:58:29Z

          wip: kryo error

          commit d313ebc1618ae5cb44b5161b492bb03baba8216d
          Author: Andrew Palumbo <apalumbo@apache.org>
          Date: 2016-04-10T00:58:13Z

          Kryo stackOverflow


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user andrewpalumbo opened a pull request: https://github.com/apache/mahout/pull/215 MAHOUT-1750 - For Review. FlinkOpABt too many operations to implement? When (partially) Implementing `FlinkOpABt` so far, It seems that there may be too many Flink map/reduce/group/etc operations to finish using this method. * NOTE *: this is unfinished and is not numerically correct. Currently, when testing am getting kyro stack overflow exceptions, which as I understand are often caused by a long string of operations: ``` Job execution failed. org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.StackOverflowError at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:74) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) {...} ``` Any comments are appreciated. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1750 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/215.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #215 commit fab6bb86fa1af0a7efce2300dcada45c32b35677 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-28T01:34:34Z start Stealing DL's Spark code commit 7e1a8bbaaca71036e9c0247f07a40368432f592b Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-28T01:55:01Z wip: Spark combineByKey => Flink groupReduce commit 31a644b36ed92c5aae3b80d5e8fbca7dd8a1d87a Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-28T22:38:31Z implement first step of the combiner commit 7dc73890ac5eeaad1ba38ab799b19b989c6ff918 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-28T22:53:09Z Add in type information for maps, etc. commit 5c60167f92a32b4c0751f6bf8dc12a838e1cde78 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-28T23:50:12Z Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750 commit 9b01a4a498c080e6063387608253890a13e77ad4 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-29T02:48:43Z remove all unnecessary type info still not builing with GroupCombineFunctions commit 37391a5a51b6803fdf893349a45a61903a6928e5 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-30T00:07:26Z wip commit d65e23b91f3c055c80de432dc9699c62512dd34e Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-30T00:07:40Z Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750 commit 3127d3218d9fa0fba35c9c240db63e71b00c8a32 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-30T01:01:15Z wip: still problems with error: object creation impossible, since method mapPartition in class RichMapPartitionFunction of type (x: Iterable[(Array [K1] , org.apache.mahout.math.Matrix)], x: org.apache.flink.util.Collector[(Int, Array [K1] , org.apache.mahout.math.Matrix)])Unit is not defined commit c19493b058e6d92f10fadeb32be9bafb14e7c671 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T05:17:03Z WIP commit b9ed381c2178392e785c4835adfac53b8d0ec5fb Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T08:41:58Z Revert "WIP" This reverts commit c19493b058e6d92f10fadeb32be9bafb14e7c671. commit 3311e89fbef3751476dbf113f680330b61f64160 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T08:51:20Z Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1750 commit db21e7b12a0b4e05ff27fa9031fd8bf3e5f22bda Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T17:02:05Z Use java.lang.Iterable instead of scala.Iterable- this fixes the object creation issue. commit d3d60fadf50ceb12f559d4c94c9402735c8a7d30 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T21:45:03Z wip commit 0a83e122c82eb0584a00510f7fb571432e834064 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T22:11:26Z wip: error is Error:(180, 23) overriding method reduce in trait ReduceFunction of type (x: (Array [K] , org.apache.mahout.math.Matrix), x: (Array [K] , org.apache.mahout.math.Matrix))(Array [K] , org.apache.mahout.math.Matrix); method reduce has incompatible type def reduce(mx1: (Array [K] , Matrix), mx2: (Array [K] , Matrix)) { ^ commit 97a6aea7c4fd0abc719605991322ac4acb758ea8 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-08T22:20:56Z wip: compiling now commit 088d925ba3cc5e93e25ad37f8e2fcf4d4698595f Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-09T00:44:22Z not returning any partitions. failing at dimensionality count in CheckpointedDrmFlink commit 82ae321b85b84e478152f208059f9c8146d814c8 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-09T01:01:05Z wip: need to make combiner into a merger commit 3c7c2ff937206b0882f6395058a8646ed1c0cdaa Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-09T14:29:38Z wip: comments, combiner commit cd2b59ac96b63c93ae18619cdf22b6c2742e93d3 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-09T19:58:29Z wip: kryo error commit d313ebc1618ae5cb44b5161b492bb03baba8216d Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-04-10T00:58:13Z Kryo stackOverflow
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the pull request:

          https://github.com/apache/mahout/pull/215#issuecomment-207982475

          How can I reproduce the issue?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/mahout/pull/215#issuecomment-207982475 How can I reproduce the issue?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user smarthi commented on a diff in the pull request:

          https://github.com/apache/mahout/pull/215#discussion_r59133538

          — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala —
          @@ -0,0 +1,246 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.mahout.flinkbindings.blas
          +
          +import org.apache.flink.api.common.functions._
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.scala.DataSet
          +import org.apache.flink.util.Collector
          +import org.apache.mahout.logging._
          +import org.apache.mahout.math.drm.

          {BlockifiedDrmTuple, DrmTuple}

          +import org.apache.mahout.math.drm.logical.

          {OpAB, OpABt}

          +import org.apache.mahout.math.scalabindings.RLikeOps._
          +import org.apache.mahout.math.scalabindings._
          +import org.apache.mahout.math.

          {Matrix, SparseMatrix, SparseRowMatrix}

          +import org.apache.mahout.flinkbindings._
          +import org.apache.mahout.flinkbindings.drm._
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.Collector
          +import org.apache.mahout.flinkbindings._
          +import org.apache.mahout.flinkbindings.drm.

          {BlockifiedFlinkDrm, FlinkDrm}

          +import org.apache.mahout.math.

          {Matrix, Vector}

          +import org.apache.mahout.math.drm._
          +import org.apache.mahout.math.drm.logical.OpABt
          +import org.apache.mahout.math.scalabindings.RLikeOps._
          +
          +import scala.collection.JavaConverters._
          +import scala.reflect.ClassTag
          +import org.apache.flink.api.scala.createTypeInformation
          +
          +/** Contains DataSet plans for ABt operator */
          +object FlinkOpABt {
          +
          + private final implicit val log = getLog(FlinkOpABt.getClass)
          +
          + /**
          + * General entry point for AB' operator.
          + *
          + * @param operator the AB' operator
          + * @param srcA A source DataSet
          + * @param srcB B source DataSet
          + * @tparam K
          + */
          + def abt[K: ClassTag: TypeInformation](
          + operator: OpABt[K],
          + srcA: FlinkDrm[K],
          + srcB: FlinkDrm[Int]): FlinkDrm[K] =

          { + + debug("operator AB'(Flink)") + abt_nograph[K](operator, srcA, srcB) + }

          +
          + /**
          + * Computes AB'
          + *
          + * General idea here is that we split both A and B vertically into blocks (one block per split),
          + * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of
          + * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block)
          + * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block).
          + *
          + * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group
          + * horizontally, forming single vertical block of the final product AB'.
          + *
          + * This logic is complicated a little by the fact that we have to keep block row and column keys
          + * so that the stitching of AB'-blocks happens according to integer row indices of the B input.
          + */
          + private[flinkbindings] def abt_nograph[K: ClassTag: TypeInformation](
          + operator: OpABt[K],
          + srcA: FlinkDrm[K],
          + srcB: FlinkDrm[Int]): FlinkDrm[K] = {
          +
          + // Blockify everything.
          + val blocksA = srcA.asBlockified
          + val blocksB = srcB.asBlockified
          +
          + val prodNCol = operator.ncol
          + val prodNRow = operator.nrow
          +
          +
          + // blockwise multiplication function
          + def mmulFunc(tupleA: (Array[K], Matrix), tupleB: (Array[Int], Matrix)): (Array[K], Array[Int], Matrix) = {
          + val (keysA, blockA) = tupleA
          + val (keysB, blockB) = tupleB
          +
          + var ms = traceDo(System.currentTimeMillis())
          +
          + // We need to send keysB to the aggregator in order to know which columns are being updated.
          + val result = (keysA, keysB, blockA %*% blockB.t)
          +
          + ms = traceDo(System.currentTimeMillis() - ms.get)
          + trace(
          + s"block multiplication of($

          {blockA.nrow}

          x$

          {blockA.ncol}

          x $

          {blockB.ncol}

          x$

          {blockB.nrow}

          is completed in $ms " +
          + "ms.")
          + trace(s"block multiplication types: blockA: $

          {blockA.getClass.getName}

          ($

          {blockA.t.getClass.getName}

          ); " +
          + s"blockB: $

          {blockB.getClass.getName}

          .")
          +
          + result.asInstanceOf[(Array[K], Array[Int], Matrix)]
          + }
          +
          +
          + implicit val typeInformation = createTypeInformation[(Array[K], Matrix)]
          + implicit val typeInformation2 = createTypeInformation[(Int, (Array[K], Array[Int], Matrix))]
          + implicit val typeInformation3 = createTypeInformation[(Array[K], Array[Int], Matrix)]
          +
          + val blockwiseMmulDataSet =
          +
          + // Combine blocks pairwise.
          + pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc)
          +
          + // Now reduce proper product blocks.
          + // group by the partition key
          + .groupBy(0)
          +
          + // combine as transpose
          + .combineGroup(new GroupCombineFunction[(Int, (Array[K], Array[Int], Matrix)), (Array[K], Matrix)] {
          +
          + def combine(values: java.lang.Iterable[(Int, (Array[K], Array[Int], Matrix))],
          + out: Collector[(Array[K], Matrix)]): Unit =

          { + val tuple = values.iterator().next + val rowKeys = tuple._2._1 + val colKeys = tuple._2._2 + val block = tuple._2._3 + + val comb = new SparseMatrix(prodNCol, block.nrow).t + for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i) + val res = (rowKeys, comb) + + out.collect(res) + }

          + })
          +
          + // reduce into a final Blockified matrix
          + .reduce(new ReduceFunction[(Array[K], Matrix)] {
          +
          + def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)): (Array[K], Matrix) =

          { + mx1._2 += mx2._2 + mx1 + }

          + })
          +
          +
          + new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol)
          +
          + }
          + /**
          + * This function tries to use join instead of cartesian to group blocks together without bloating
          + * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
          + * so if the data to one of the join parts is streaming, the result is still fitting to memory,
          + * since result size is much smaller than the operands.
          + *
          + * @param blocksA blockified DataSet for A
          + * @param blocksB blockified DataSet for B
          + * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
          + * switched to another scheme based on which of the sides, A or B, is bigger.
          + */
          + private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation,
          + T: ClassTag: TypeInformation]
          + ( blocksA: BlockifiedDrmDataSet[K1], blocksB: BlockifiedDrmDataSet[K2], blockFunc:
          + ((Array[K1], Matrix), (Array[K2], Matrix)) =>
          + (Array[K1], Array[Int], Matrix) ): DataSet[(Int, (Array[K1], Array[Int], Matrix))] = {
          +
          + implicit val typeInformationA = createTypeInformation[(Int, Array[K1], Matrix)]
          + implicit val typeInformationProd = createTypeInformation[(Int, (Array[K1], Array[Int], Matrix))]
          +
          + // We will be joining blocks in B to blocks in A using A-partition as a key.
          +
          + // Prepare A side.
          + val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
          + (Int, Array[K1], Matrix)] {
          + // partition number
          + var part: Int = 0
          +
          + // get the index of the partition
          + override def open(params: Configuration): Unit =

          { + part = getRuntimeContext.getIndexOfThisSubtask + }

          +
          + // bind the partition number to each keySet/block
          + def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
          +
          + val blockIter = values.iterator()
          + if (blockIter.hasNext()) {
          + val r = part -> blockIter.next
          + require(!blockIter.hasNext, s"more than 1 ($

          {blockIter.asScala.size + 1}) blocks per partition and A of AB'")
          + out.collect((r._1, r._2._1, r._2._2))
          + }
          + }
          + })
          +
          + implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))]
          +
          + // Prepare B-side.
          +// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
          +
          + val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
          + (Int, Array[K2], Matrix)] {
          + // partition number
          + var part: Int = 0
          +
          + // get the index of the partition
          + override def open(params: Configuration): Unit = { + part = getRuntimeContext.getIndexOfThisSubtask + }
          +
          + // bind the partition number to each keySet/block
          + def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
          +
          + val blockIter = values.iterator()
          + if (blockIter.hasNext()) {
          + val r = part -> blockIter.next
          + require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}

          ) blocks per partition and A of AB'")
          + out.collect((r._1, r._2._1, r._2._2))
          + }
          + }
          + })
          +
          +
          + implicit val typeInformationJ = createTypeInformation[(Int, ((Array[K1], Matrix),(Int, (Array[K2], Matrix))))]
          + implicit val typeInformationJprod = createTypeInformation[(Int, T)]
          +
          +
          + // Perform the inner join.
          + val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0)
          +
          + // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
          + val mapped = joined.rebalance().map{tuple => tuple._1._1 ->
          — End diff –

          u think rebalance() here is actually eliminating empty partitions ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/mahout/pull/215#discussion_r59133538 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala — @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.flinkbindings.blas + +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet +import org.apache.flink.util.Collector +import org.apache.mahout.logging._ +import org.apache.mahout.math.drm. {BlockifiedDrmTuple, DrmTuple} +import org.apache.mahout.math.drm.logical. {OpAB, OpABt} +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math. {Matrix, SparseMatrix, SparseRowMatrix} +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.drm._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.drm. {BlockifiedFlinkDrm, FlinkDrm} +import org.apache.mahout.math. {Matrix, Vector} +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.logical.OpABt +import org.apache.mahout.math.scalabindings.RLikeOps._ + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag +import org.apache.flink.api.scala.createTypeInformation + +/** Contains DataSet plans for ABt operator */ +object FlinkOpABt { + + private final implicit val log = getLog(FlinkOpABt.getClass) + + /** + * General entry point for AB' operator. + * + * @param operator the AB' operator + * @param srcA A source DataSet + * @param srcB B source DataSet + * @tparam K + */ + def abt [K: ClassTag: TypeInformation] ( + operator: OpABt [K] , + srcA: FlinkDrm [K] , + srcB: FlinkDrm [Int] ): FlinkDrm [K] = { + + debug("operator AB'(Flink)") + abt_nograph[K](operator, srcA, srcB) + } + + /** + * Computes AB' + * + * General idea here is that we split both A and B vertically into blocks (one block per split), + * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of + * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block) + * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block). + * + * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group + * horizontally, forming single vertical block of the final product AB'. + * + * This logic is complicated a little by the fact that we have to keep block row and column keys + * so that the stitching of AB'-blocks happens according to integer row indices of the B input. + */ + private [flinkbindings] def abt_nograph [K: ClassTag: TypeInformation] ( + operator: OpABt [K] , + srcA: FlinkDrm [K] , + srcB: FlinkDrm [Int] ): FlinkDrm [K] = { + + // Blockify everything. + val blocksA = srcA.asBlockified + val blocksB = srcB.asBlockified + + val prodNCol = operator.ncol + val prodNRow = operator.nrow + + + // blockwise multiplication function + def mmulFunc(tupleA: (Array [K] , Matrix), tupleB: (Array [Int] , Matrix)): (Array [K] , Array [Int] , Matrix) = { + val (keysA, blockA) = tupleA + val (keysB, blockB) = tupleB + + var ms = traceDo(System.currentTimeMillis()) + + // We need to send keysB to the aggregator in order to know which columns are being updated. + val result = (keysA, keysB, blockA %*% blockB.t) + + ms = traceDo(System.currentTimeMillis() - ms.get) + trace( + s"block multiplication of($ {blockA.nrow} x$ {blockA.ncol} x $ {blockB.ncol} x$ {blockB.nrow} is completed in $ms " + + "ms.") + trace(s"block multiplication types: blockA: $ {blockA.getClass.getName} ($ {blockA.t.getClass.getName} ); " + + s"blockB: $ {blockB.getClass.getName} .") + + result.asInstanceOf[(Array [K] , Array [Int] , Matrix)] + } + + + implicit val typeInformation = createTypeInformation[(Array [K] , Matrix)] + implicit val typeInformation2 = createTypeInformation[(Int, (Array [K] , Array [Int] , Matrix))] + implicit val typeInformation3 = createTypeInformation[(Array [K] , Array [Int] , Matrix)] + + val blockwiseMmulDataSet = + + // Combine blocks pairwise. + pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc) + + // Now reduce proper product blocks. + // group by the partition key + .groupBy(0) + + // combine as transpose + .combineGroup(new GroupCombineFunction[(Int, (Array [K] , Array [Int] , Matrix)), (Array [K] , Matrix)] { + + def combine(values: java.lang.Iterable[(Int, (Array [K] , Array [Int] , Matrix))], + out: Collector[(Array [K] , Matrix)]): Unit = { + val tuple = values.iterator().next + val rowKeys = tuple._2._1 + val colKeys = tuple._2._2 + val block = tuple._2._3 + + val comb = new SparseMatrix(prodNCol, block.nrow).t + for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i) + val res = (rowKeys, comb) + + out.collect(res) + } + }) + + // reduce into a final Blockified matrix + .reduce(new ReduceFunction[(Array [K] , Matrix)] { + + def reduce(mx1: (Array [K] , Matrix), mx2: (Array [K] , Matrix)): (Array [K] , Matrix) = { + mx1._2 += mx2._2 + mx1 + } + }) + + + new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol) + + } + /** + * This function tries to use join instead of cartesian to group blocks together without bloating + * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away + * so if the data to one of the join parts is streaming, the result is still fitting to memory, + * since result size is much smaller than the operands. + * + * @param blocksA blockified DataSet for A + * @param blocksB blockified DataSet for B + * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be + * switched to another scheme based on which of the sides, A or B, is bigger. + */ + private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation, + T: ClassTag: TypeInformation] + ( blocksA: BlockifiedDrmDataSet [K1] , blocksB: BlockifiedDrmDataSet [K2] , blockFunc: + ((Array [K1] , Matrix), (Array [K2] , Matrix)) => + (Array [K1] , Array [Int] , Matrix) ): DataSet[(Int, (Array [K1] , Array [Int] , Matrix))] = { + + implicit val typeInformationA = createTypeInformation[(Int, Array [K1] , Matrix)] + implicit val typeInformationProd = createTypeInformation[(Int, (Array [K1] , Array [Int] , Matrix))] + + // We will be joining blocks in B to blocks in A using A-partition as a key. + + // Prepare A side. + val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array [K1] , Matrix), + (Int, Array [K1] , Matrix)] { + // partition number + var part: Int = 0 + + // get the index of the partition + override def open(params: Configuration): Unit = { + part = getRuntimeContext.getIndexOfThisSubtask + } + + // bind the partition number to each keySet/block + def mapPartition(values: java.lang.Iterable[(Array [K1] , Matrix)], out: Collector[(Int, Array [K1] , Matrix)]): Unit = { + + val blockIter = values.iterator() + if (blockIter.hasNext()) { + val r = part -> blockIter.next + require(!blockIter.hasNext, s"more than 1 ($ {blockIter.asScala.size + 1}) blocks per partition and A of AB'") + out.collect((r._1, r._2._1, r._2._2)) + } + } + }) + + implicit val typeInformationB = createTypeInformation[(Int, (Array [K2] , Matrix))] + + // Prepare B-side. +// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple ) + + val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array [K2] , Matrix), + (Int, Array [K2] , Matrix)] { + // partition number + var part: Int = 0 + + // get the index of the partition + override def open(params: Configuration): Unit = { + part = getRuntimeContext.getIndexOfThisSubtask + } + + // bind the partition number to each keySet/block + def mapPartition(values: java.lang.Iterable[(Array [K2] , Matrix)], out: Collector[(Int, Array [K2] , Matrix)]): Unit = { + + val blockIter = values.iterator() + if (blockIter.hasNext()) { + val r = part -> blockIter.next + require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1} ) blocks per partition and A of AB'") + out.collect((r._1, r._2._1, r._2._2)) + } + } + }) + + + implicit val typeInformationJ = createTypeInformation[(Int, ((Array [K1] , Matrix),(Int, (Array [K2] , Matrix))))] + implicit val typeInformationJprod = createTypeInformation [(Int, T)] + + + // Perform the inner join. + val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0) + + // Apply product function which should produce smaller products. Hopefully, this streams blockB's in + val mapped = joined.rebalance().map{tuple => tuple._1._1 -> — End diff – u think rebalance() here is actually eliminating empty partitions ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on a diff in the pull request:

          https://github.com/apache/mahout/pull/215#discussion_r59137058

          — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala —
          @@ -0,0 +1,246 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.mahout.flinkbindings.blas
          +
          +import org.apache.flink.api.common.functions._
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.scala.DataSet
          +import org.apache.flink.util.Collector
          +import org.apache.mahout.logging._
          +import org.apache.mahout.math.drm.

          {BlockifiedDrmTuple, DrmTuple}

          +import org.apache.mahout.math.drm.logical.

          {OpAB, OpABt}

          +import org.apache.mahout.math.scalabindings.RLikeOps._
          +import org.apache.mahout.math.scalabindings._
          +import org.apache.mahout.math.

          {Matrix, SparseMatrix, SparseRowMatrix}

          +import org.apache.mahout.flinkbindings._
          +import org.apache.mahout.flinkbindings.drm._
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.Collector
          +import org.apache.mahout.flinkbindings._
          +import org.apache.mahout.flinkbindings.drm.

          {BlockifiedFlinkDrm, FlinkDrm}

          +import org.apache.mahout.math.

          {Matrix, Vector}

          +import org.apache.mahout.math.drm._
          +import org.apache.mahout.math.drm.logical.OpABt
          +import org.apache.mahout.math.scalabindings.RLikeOps._
          +
          +import scala.collection.JavaConverters._
          +import scala.reflect.ClassTag
          +import org.apache.flink.api.scala.createTypeInformation
          +
          +/** Contains DataSet plans for ABt operator */
          +object FlinkOpABt {
          +
          + private final implicit val log = getLog(FlinkOpABt.getClass)
          +
          + /**
          + * General entry point for AB' operator.
          + *
          + * @param operator the AB' operator
          + * @param srcA A source DataSet
          + * @param srcB B source DataSet
          + * @tparam K
          + */
          + def abt[K: ClassTag: TypeInformation](
          + operator: OpABt[K],
          + srcA: FlinkDrm[K],
          + srcB: FlinkDrm[Int]): FlinkDrm[K] =

          { + + debug("operator AB'(Flink)") + abt_nograph[K](operator, srcA, srcB) + }

          +
          + /**
          + * Computes AB'
          + *
          + * General idea here is that we split both A and B vertically into blocks (one block per split),
          + * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of
          + * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block)
          + * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block).
          + *
          + * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group
          + * horizontally, forming single vertical block of the final product AB'.
          + *
          + * This logic is complicated a little by the fact that we have to keep block row and column keys
          + * so that the stitching of AB'-blocks happens according to integer row indices of the B input.
          + */
          + private[flinkbindings] def abt_nograph[K: ClassTag: TypeInformation](
          + operator: OpABt[K],
          + srcA: FlinkDrm[K],
          + srcB: FlinkDrm[Int]): FlinkDrm[K] = {
          +
          + // Blockify everything.
          + val blocksA = srcA.asBlockified
          + val blocksB = srcB.asBlockified
          +
          + val prodNCol = operator.ncol
          + val prodNRow = operator.nrow
          +
          +
          + // blockwise multiplication function
          + def mmulFunc(tupleA: (Array[K], Matrix), tupleB: (Array[Int], Matrix)): (Array[K], Array[Int], Matrix) = {
          + val (keysA, blockA) = tupleA
          + val (keysB, blockB) = tupleB
          +
          + var ms = traceDo(System.currentTimeMillis())
          +
          + // We need to send keysB to the aggregator in order to know which columns are being updated.
          + val result = (keysA, keysB, blockA %*% blockB.t)
          +
          + ms = traceDo(System.currentTimeMillis() - ms.get)
          + trace(
          + s"block multiplication of($

          {blockA.nrow}

          x$

          {blockA.ncol}

          x $

          {blockB.ncol}

          x$

          {blockB.nrow}

          is completed in $ms " +
          + "ms.")
          + trace(s"block multiplication types: blockA: $

          {blockA.getClass.getName}

          ($

          {blockA.t.getClass.getName}

          ); " +
          + s"blockB: $

          {blockB.getClass.getName}

          .")
          +
          + result.asInstanceOf[(Array[K], Array[Int], Matrix)]
          + }
          +
          +
          + implicit val typeInformation = createTypeInformation[(Array[K], Matrix)]
          + implicit val typeInformation2 = createTypeInformation[(Int, (Array[K], Array[Int], Matrix))]
          + implicit val typeInformation3 = createTypeInformation[(Array[K], Array[Int], Matrix)]
          +
          + val blockwiseMmulDataSet =
          +
          + // Combine blocks pairwise.
          + pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc)
          +
          + // Now reduce proper product blocks.
          + // group by the partition key
          + .groupBy(0)
          +
          + // combine as transpose
          + .combineGroup(new GroupCombineFunction[(Int, (Array[K], Array[Int], Matrix)), (Array[K], Matrix)] {
          +
          + def combine(values: java.lang.Iterable[(Int, (Array[K], Array[Int], Matrix))],
          + out: Collector[(Array[K], Matrix)]): Unit =

          { + val tuple = values.iterator().next + val rowKeys = tuple._2._1 + val colKeys = tuple._2._2 + val block = tuple._2._3 + + val comb = new SparseMatrix(prodNCol, block.nrow).t + for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i) + val res = (rowKeys, comb) + + out.collect(res) + }

          + })
          +
          + // reduce into a final Blockified matrix
          + .reduce(new ReduceFunction[(Array[K], Matrix)] {
          +
          + def reduce(mx1: (Array[K], Matrix), mx2: (Array[K], Matrix)): (Array[K], Matrix) =

          { + mx1._2 += mx2._2 + mx1 + }

          + })
          +
          +
          + new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol)
          +
          + }
          + /**
          + * This function tries to use join instead of cartesian to group blocks together without bloating
          + * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
          + * so if the data to one of the join parts is streaming, the result is still fitting to memory,
          + * since result size is much smaller than the operands.
          + *
          + * @param blocksA blockified DataSet for A
          + * @param blocksB blockified DataSet for B
          + * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
          + * switched to another scheme based on which of the sides, A or B, is bigger.
          + */
          + private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation,
          + T: ClassTag: TypeInformation]
          + ( blocksA: BlockifiedDrmDataSet[K1], blocksB: BlockifiedDrmDataSet[K2], blockFunc:
          + ((Array[K1], Matrix), (Array[K2], Matrix)) =>
          + (Array[K1], Array[Int], Matrix) ): DataSet[(Int, (Array[K1], Array[Int], Matrix))] = {
          +
          + implicit val typeInformationA = createTypeInformation[(Int, Array[K1], Matrix)]
          + implicit val typeInformationProd = createTypeInformation[(Int, (Array[K1], Array[Int], Matrix))]
          +
          + // We will be joining blocks in B to blocks in A using A-partition as a key.
          +
          + // Prepare A side.
          + val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
          + (Int, Array[K1], Matrix)] {
          + // partition number
          + var part: Int = 0
          +
          + // get the index of the partition
          + override def open(params: Configuration): Unit =

          { + part = getRuntimeContext.getIndexOfThisSubtask + }

          +
          + // bind the partition number to each keySet/block
          + def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
          +
          + val blockIter = values.iterator()
          + if (blockIter.hasNext()) {
          + val r = part -> blockIter.next
          + require(!blockIter.hasNext, s"more than 1 ($

          {blockIter.asScala.size + 1}) blocks per partition and A of AB'")
          + out.collect((r._1, r._2._1, r._2._2))
          + }
          + }
          + })
          +
          + implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))]
          +
          + // Prepare B-side.
          +// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
          +
          + val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
          + (Int, Array[K2], Matrix)] {
          + // partition number
          + var part: Int = 0
          +
          + // get the index of the partition
          + override def open(params: Configuration): Unit = { + part = getRuntimeContext.getIndexOfThisSubtask + }
          +
          + // bind the partition number to each keySet/block
          + def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
          +
          + val blockIter = values.iterator()
          + if (blockIter.hasNext()) {
          + val r = part -> blockIter.next
          + require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}

          ) blocks per partition and A of AB'")
          + out.collect((r._1, r._2._1, r._2._2))
          + }
          + }
          + })
          +
          +
          + implicit val typeInformationJ = createTypeInformation[(Int, ((Array[K1], Matrix),(Int, (Array[K2], Matrix))))]
          + implicit val typeInformationJprod = createTypeInformation[(Int, T)]
          +
          +
          + // Perform the inner join.
          + val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0)
          +
          + // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
          + val mapped = joined.rebalance().map{tuple => tuple._1._1 ->
          — End diff –

          @smarthi Not sure, I was testing a theory with that. I should take that out.

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/215#discussion_r59137058 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpABt.scala — @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.flinkbindings.blas + +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet +import org.apache.flink.util.Collector +import org.apache.mahout.logging._ +import org.apache.mahout.math.drm. {BlockifiedDrmTuple, DrmTuple} +import org.apache.mahout.math.drm.logical. {OpAB, OpABt} +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math. {Matrix, SparseMatrix, SparseRowMatrix} +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.drm._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.drm. {BlockifiedFlinkDrm, FlinkDrm} +import org.apache.mahout.math. {Matrix, Vector} +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.logical.OpABt +import org.apache.mahout.math.scalabindings.RLikeOps._ + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag +import org.apache.flink.api.scala.createTypeInformation + +/** Contains DataSet plans for ABt operator */ +object FlinkOpABt { + + private final implicit val log = getLog(FlinkOpABt.getClass) + + /** + * General entry point for AB' operator. + * + * @param operator the AB' operator + * @param srcA A source DataSet + * @param srcB B source DataSet + * @tparam K + */ + def abt [K: ClassTag: TypeInformation] ( + operator: OpABt [K] , + srcA: FlinkDrm [K] , + srcB: FlinkDrm [Int] ): FlinkDrm [K] = { + + debug("operator AB'(Flink)") + abt_nograph[K](operator, srcA, srcB) + } + + /** + * Computes AB' + * + * General idea here is that we split both A and B vertically into blocks (one block per split), + * then compute cartesian join of the blocks of both data sets. This creates tuples of the form of + * (A-block, B-block). We enumerate A-blocks and transform this into (A-block-id, A-block, B-block) + * and then compute A-block %*% B-block', thus producing tuples (A-block-id, AB'-block). + * + * The next step is to group the above tuples by A-block-id and stitch al AB'-blocks in the group + * horizontally, forming single vertical block of the final product AB'. + * + * This logic is complicated a little by the fact that we have to keep block row and column keys + * so that the stitching of AB'-blocks happens according to integer row indices of the B input. + */ + private [flinkbindings] def abt_nograph [K: ClassTag: TypeInformation] ( + operator: OpABt [K] , + srcA: FlinkDrm [K] , + srcB: FlinkDrm [Int] ): FlinkDrm [K] = { + + // Blockify everything. + val blocksA = srcA.asBlockified + val blocksB = srcB.asBlockified + + val prodNCol = operator.ncol + val prodNRow = operator.nrow + + + // blockwise multiplication function + def mmulFunc(tupleA: (Array [K] , Matrix), tupleB: (Array [Int] , Matrix)): (Array [K] , Array [Int] , Matrix) = { + val (keysA, blockA) = tupleA + val (keysB, blockB) = tupleB + + var ms = traceDo(System.currentTimeMillis()) + + // We need to send keysB to the aggregator in order to know which columns are being updated. + val result = (keysA, keysB, blockA %*% blockB.t) + + ms = traceDo(System.currentTimeMillis() - ms.get) + trace( + s"block multiplication of($ {blockA.nrow} x$ {blockA.ncol} x $ {blockB.ncol} x$ {blockB.nrow} is completed in $ms " + + "ms.") + trace(s"block multiplication types: blockA: $ {blockA.getClass.getName} ($ {blockA.t.getClass.getName} ); " + + s"blockB: $ {blockB.getClass.getName} .") + + result.asInstanceOf[(Array [K] , Array [Int] , Matrix)] + } + + + implicit val typeInformation = createTypeInformation[(Array [K] , Matrix)] + implicit val typeInformation2 = createTypeInformation[(Int, (Array [K] , Array [Int] , Matrix))] + implicit val typeInformation3 = createTypeInformation[(Array [K] , Array [Int] , Matrix)] + + val blockwiseMmulDataSet = + + // Combine blocks pairwise. + pairwiseApply(blocksA.asBlockified.ds, blocksB.asBlockified.ds, mmulFunc) + + // Now reduce proper product blocks. + // group by the partition key + .groupBy(0) + + // combine as transpose + .combineGroup(new GroupCombineFunction[(Int, (Array [K] , Array [Int] , Matrix)), (Array [K] , Matrix)] { + + def combine(values: java.lang.Iterable[(Int, (Array [K] , Array [Int] , Matrix))], + out: Collector[(Array [K] , Matrix)]): Unit = { + val tuple = values.iterator().next + val rowKeys = tuple._2._1 + val colKeys = tuple._2._2 + val block = tuple._2._3 + + val comb = new SparseMatrix(prodNCol, block.nrow).t + for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i) + val res = (rowKeys, comb) + + out.collect(res) + } + }) + + // reduce into a final Blockified matrix + .reduce(new ReduceFunction[(Array [K] , Matrix)] { + + def reduce(mx1: (Array [K] , Matrix), mx2: (Array [K] , Matrix)): (Array [K] , Matrix) = { + mx1._2 += mx2._2 + mx1 + } + }) + + + new BlockifiedFlinkDrm(ds = blockwiseMmulDataSet, ncol = prodNCol) + + } + /** + * This function tries to use join instead of cartesian to group blocks together without bloating + * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away + * so if the data to one of the join parts is streaming, the result is still fitting to memory, + * since result size is much smaller than the operands. + * + * @param blocksA blockified DataSet for A + * @param blocksB blockified DataSet for B + * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be + * switched to another scheme based on which of the sides, A or B, is bigger. + */ + private def pairwiseApply[K1: ClassTag: TypeInformation, K2: ClassTag: TypeInformation, + T: ClassTag: TypeInformation] + ( blocksA: BlockifiedDrmDataSet [K1] , blocksB: BlockifiedDrmDataSet [K2] , blockFunc: + ((Array [K1] , Matrix), (Array [K2] , Matrix)) => + (Array [K1] , Array [Int] , Matrix) ): DataSet[(Int, (Array [K1] , Array [Int] , Matrix))] = { + + implicit val typeInformationA = createTypeInformation[(Int, Array [K1] , Matrix)] + implicit val typeInformationProd = createTypeInformation[(Int, (Array [K1] , Array [Int] , Matrix))] + + // We will be joining blocks in B to blocks in A using A-partition as a key. + + // Prepare A side. + val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array [K1] , Matrix), + (Int, Array [K1] , Matrix)] { + // partition number + var part: Int = 0 + + // get the index of the partition + override def open(params: Configuration): Unit = { + part = getRuntimeContext.getIndexOfThisSubtask + } + + // bind the partition number to each keySet/block + def mapPartition(values: java.lang.Iterable[(Array [K1] , Matrix)], out: Collector[(Int, Array [K1] , Matrix)]): Unit = { + + val blockIter = values.iterator() + if (blockIter.hasNext()) { + val r = part -> blockIter.next + require(!blockIter.hasNext, s"more than 1 ($ {blockIter.asScala.size + 1}) blocks per partition and A of AB'") + out.collect((r._1, r._2._1, r._2._2)) + } + } + }) + + implicit val typeInformationB = createTypeInformation[(Int, (Array [K2] , Matrix))] + + // Prepare B-side. +// val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple ) + + val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array [K2] , Matrix), + (Int, Array [K2] , Matrix)] { + // partition number + var part: Int = 0 + + // get the index of the partition + override def open(params: Configuration): Unit = { + part = getRuntimeContext.getIndexOfThisSubtask + } + + // bind the partition number to each keySet/block + def mapPartition(values: java.lang.Iterable[(Array [K2] , Matrix)], out: Collector[(Int, Array [K2] , Matrix)]): Unit = { + + val blockIter = values.iterator() + if (blockIter.hasNext()) { + val r = part -> blockIter.next + require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1} ) blocks per partition and A of AB'") + out.collect((r._1, r._2._1, r._2._2)) + } + } + }) + + + implicit val typeInformationJ = createTypeInformation[(Int, ((Array [K1] , Matrix),(Int, (Array [K2] , Matrix))))] + implicit val typeInformationJprod = createTypeInformation [(Int, T)] + + + // Perform the inner join. + val joined = blocksAKeyed.join(blocksBKeyed).where(0).equalTo(0) + + // Apply product function which should produce smaller products. Hopefully, this streams blockB's in + val mapped = joined.rebalance().map{tuple => tuple._1._1 -> — End diff – @smarthi Not sure, I was testing a theory with that. I should take that out.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on the pull request:

          https://github.com/apache/mahout/pull/215#issuecomment-208015484

          @rmetzger To reproduce, you'd have to pull this branch, and set a `$MAHOUT_HOME` env vaiabler to point at the base directory. Then from `$MAHOUT_HOME` run `mvn clean install -DskipTests && cd flink && mvn test -Dsuites="RLikeDrmOpsSuite"`.

          Or you could run the `RLikeDrmOpsSuite` suite from IntelliJ, but make sure that you set the `MAHOUT_HOME` env variable in the "Run/Debug Configurations" for the test.

          Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/215#issuecomment-208015484 @rmetzger To reproduce, you'd have to pull this branch, and set a `$MAHOUT_HOME` env vaiabler to point at the base directory. Then from `$MAHOUT_HOME` run `mvn clean install -DskipTests && cd flink && mvn test -Dsuites="RLikeDrmOpsSuite"`. Or you could run the `RLikeDrmOpsSuite` suite from IntelliJ, but make sure that you set the `MAHOUT_HOME` env variable in the "Run/Debug Configurations" for the test. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on the pull request:

          https://github.com/apache/mahout/pull/215#issuecomment-208017898

          or rather `mvn clean install -DskipTests && cd flink && mvn test -Dsuites="*RLikeDrmOpsSuite"`

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/215#issuecomment-208017898 or rather `mvn clean install -DskipTests && cd flink && mvn test -Dsuites="*RLikeDrmOpsSuite"`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the pull request:

          https://github.com/apache/mahout/pull/215#issuecomment-208029189

          I think the problem is the `TransposedMatrixView` class.

          See :
          ![recursive-mahout](https://cloud.githubusercontent.com/assets/89049/14411877/13266a16-ff55-11e5-82ae-becb2f4f53af.png)

          And this code example

          ```scala
          def main(args: Array[String])

          { val ser = new KryoSerializer[TransposedMatrixView](classOf[TransposedMatrixView], new ExecutionConfig()) val matrix = new SparseMatrix(15, 15) val inst = new TransposedMatrixView(matrix) ser.serialize(inst, new DataOutputSerializer(64)) }

          ```

          which will reproduce the error.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/mahout/pull/215#issuecomment-208029189 I think the problem is the `TransposedMatrixView` class. See : ! [recursive-mahout] ( https://cloud.githubusercontent.com/assets/89049/14411877/13266a16-ff55-11e5-82ae-becb2f4f53af.png ) And this code example ```scala def main(args: Array [String] ) { val ser = new KryoSerializer[TransposedMatrixView](classOf[TransposedMatrixView], new ExecutionConfig()) val matrix = new SparseMatrix(15, 15) val inst = new TransposedMatrixView(matrix) ser.serialize(inst, new DataOutputSerializer(64)) } ``` which will reproduce the error.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo commented on the pull request:

          https://github.com/apache/mahout/pull/215#issuecomment-208068896

          Thank you very much for looking into this, Robert. It is much appreciated. I am having trouble understanding what the issue is with this class. So you think that it is the Enumeration being returned by the `getStructure()` method in `TransposedMatrixView` that is the problem?

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on the pull request: https://github.com/apache/mahout/pull/215#issuecomment-208068896 Thank you very much for looking into this, Robert. It is much appreciated. I am having trouble understanding what the issue is with this class. So you think that it is the Enumeration being returned by the `getStructure()` method in `TransposedMatrixView` that is the problem?
          Hide
          smarthi Suneel Marthi added a comment - - edited

          Given that the Flink folks acknowledge this issue to be a problem in the flink codebase and have a fix in the pending Flink 1.0.2 release, can we revert the matrix setting back to 500 * 500 ?

          Show
          smarthi Suneel Marthi added a comment - - edited Given that the Flink folks acknowledge this issue to be a problem in the flink codebase and have a fix in the pending Flink 1.0.2 release, can we revert the matrix setting back to 500 * 500 ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user andrewpalumbo closed the pull request at:

          https://github.com/apache/mahout/pull/215

          Show
          githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo closed the pull request at: https://github.com/apache/mahout/pull/215

            People

            • Assignee:
              Unassigned
              Reporter:
              agrigorev Alexey Grigorev
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development