Details

    • Type: New Feature
    • Status: Closed
    • Priority: Blocker
    • Resolution: Workaround
    • Affects Version/s: 0.11.2
    • Fix Version/s: 0.12.0
    • Component/s: Flink
    • Labels:
      None

      Description

      Flink does not have in-memory caching analogous to that of Spark. We need find a way to honour the checkpoint() contract in Flink Bindings.

        Activity

        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user andrewpalumbo opened a pull request:

        https://github.com/apache/mahout/pull/203

        MAHOUT-1817 Implement caching in Flink Bindings

        As a temporary measure, use this method to persist the `DataSet` to the filesystem when caching rather that drmDfsRead()/Write.

        Todo:

        1. Break up into `persist` and `readPersistedDataset` methods and only read a persisted dataset if it is already cached.
        2. Use a property setting for the base dir.
        3. Check to make sure that this method maintains parallelism deg for the dataset, if not set the new parallelism degree to the original

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1817

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/mahout/pull/203.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #203


        commit a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
        Author: Andrew Palumbo <apalumbo@apache.org>
        Date: 2016-03-25T20:56:20Z

        Small change addressing DL's comment on apache/mahout#200, also a small fix


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user andrewpalumbo opened a pull request: https://github.com/apache/mahout/pull/203 MAHOUT-1817 Implement caching in Flink Bindings As a temporary measure, use this method to persist the `DataSet` to the filesystem when caching rather that drmDfsRead()/Write. Todo: 1. Break up into `persist` and `readPersistedDataset` methods and only read a persisted dataset if it is already cached. 2. Use a property setting for the base dir. 3. Check to make sure that this method maintains parallelism deg for the dataset, if not set the new parallelism degree to the original You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrewpalumbo/mahout MAHOUT-1817 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mahout/pull/203.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #203 commit a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6 Author: Andrew Palumbo <apalumbo@apache.org> Date: 2016-03-25T20:56:20Z Small change addressing DL's comment on apache/mahout#200, also a small fix
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi commented on the pull request:

        https://github.com/apache/mahout/pull/203#issuecomment-201925974

        lgtm

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi commented on the pull request: https://github.com/apache/mahout/pull/203#issuecomment-201925974 lgtm
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/203#discussion_r57523033

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala —
        @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],

        override val keyClassTag: ClassTag[K] = classTag[K]

        + /** Note as of Flink 1.0.0, no direct flink caching exists so we save
        + * the dataset to the filesystem and read it back when cache is called */
        def cache() = {
        if (!isCached)

        { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) }
        • implicit val typeInformation = createTypeInformation[(K,Vector)]
          + val _ds = readPersistedDataSet(cacheFileName, ds)
        • val _ds = persist(ds, persistanceRootDir + cacheFileName)
          + // We may want to look more closely at this:
          + // since we've cached a drm, triggering a computation
          + // it may not make sense to keep the same parallelism degree
          + if (!(parallelismDeg == _ds.getParallelism)) {
          + _ds.setParallelism(parallelismDeg).rebalance()
            • End diff –

        not sure what to do here, this `rebalance` call seems to adding some time to the caching call.

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/203#discussion_r57523033 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala — @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm [K: ClassTag:TypeInformation] (val ds: DrmDataSet [K] , override val keyClassTag: ClassTag [K] = classTag [K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } implicit val typeInformation = createTypeInformation [(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) val _ds = persist(ds, persistanceRootDir + cacheFileName) + // We may want to look more closely at this: + // since we've cached a drm, triggering a computation + // it may not make sense to keep the same parallelism degree + if (!(parallelismDeg == _ds.getParallelism)) { + _ds.setParallelism(parallelismDeg).rebalance() End diff – not sure what to do here, this `rebalance` call seems to adding some time to the caching call.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/203#discussion_r57523062

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala —
        @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],

        override val keyClassTag: ClassTag[K] = classTag[K]

        + /** Note as of Flink 1.0.0, no direct flink caching exists so we save
        + * the dataset to the filesystem and read it back when cache is called */
        def cache() = {
        if (!isCached)

        { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) }
        • implicit val typeInformation = createTypeInformation[(K,Vector)]
          + val _ds = readPersistedDataSet(cacheFileName, ds)
        • val _ds = persist(ds, persistanceRootDir + cacheFileName)
          + // We may want to look more closely at this:
          + // since we've cached a drm, triggering a computation
          + // it may not make sense to keep the same parallelism degree
          + if (!(parallelismDeg == _ds.getParallelism)) {
          + _ds.setParallelism(parallelismDeg).rebalance()
            • End diff –

        Is it needed? IIRC it was added when we had skewed partitions in previous flink releases.

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/mahout/pull/203#discussion_r57523062 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala — @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm [K: ClassTag:TypeInformation] (val ds: DrmDataSet [K] , override val keyClassTag: ClassTag [K] = classTag [K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } implicit val typeInformation = createTypeInformation [(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) val _ds = persist(ds, persistanceRootDir + cacheFileName) + // We may want to look more closely at this: + // since we've cached a drm, triggering a computation + // it may not make sense to keep the same parallelism degree + if (!(parallelismDeg == _ds.getParallelism)) { + _ds.setParallelism(parallelismDeg).rebalance() End diff – Is it needed? IIRC it was added when we had skewed partitions in previous flink releases.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/203#discussion_r57523151

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala —
        @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],

        override val keyClassTag: ClassTag[K] = classTag[K]

        + /** Note as of Flink 1.0.0, no direct flink caching exists so we save
        + * the dataset to the filesystem and read it back when cache is called */
        def cache() = {
        if (!isCached)

        { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) }
        • implicit val typeInformation = createTypeInformation[(K,Vector)]
          + val _ds = readPersistedDataSet(cacheFileName, ds)
        • val _ds = persist(ds, persistanceRootDir + cacheFileName)
          + // We may want to look more closely at this:
          + // since we've cached a drm, triggering a computation
          + // it may not make sense to keep the same parallelism degree
          + if (!(parallelismDeg == _ds.getParallelism)) {
          + _ds.setParallelism(parallelismDeg).rebalance()
            • End diff –

        I guess my real question is whether or not to check and re-set the parallelism after reading in from the cache:
        ```if (!(parallelismDeg == _ds.getParallelism))

        { _ds.setParallelism(parallelismDeg).rebalance() }

        ```

        several operations may result in a much smaller matrix than the original DRM but it is not calculated until here. Its really a case by case thing, so difficult to figure out.

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/203#discussion_r57523151 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala — @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm [K: ClassTag:TypeInformation] (val ds: DrmDataSet [K] , override val keyClassTag: ClassTag [K] = classTag [K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } implicit val typeInformation = createTypeInformation [(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) val _ds = persist(ds, persistanceRootDir + cacheFileName) + // We may want to look more closely at this: + // since we've cached a drm, triggering a computation + // it may not make sense to keep the same parallelism degree + if (!(parallelismDeg == _ds.getParallelism)) { + _ds.setParallelism(parallelismDeg).rebalance() End diff – I guess my real question is whether or not to check and re-set the parallelism after reading in from the cache: ```if (!(parallelismDeg == _ds.getParallelism)) { _ds.setParallelism(parallelismDeg).rebalance() } ``` several operations may result in a much smaller matrix than the original DRM but it is not calculated until here. Its really a case by case thing, so difficult to figure out.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/203#discussion_r57523491

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala —
        @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],

        override val keyClassTag: ClassTag[K] = classTag[K]

        + /** Note as of Flink 1.0.0, no direct flink caching exists so we save
        + * the dataset to the filesystem and read it back when cache is called */
        def cache() = {
        if (!isCached)

        { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) }
        • implicit val typeInformation = createTypeInformation[(K,Vector)]
          + val _ds = readPersistedDataSet(cacheFileName, ds)
        • val _ds = persist(ds, persistanceRootDir + cacheFileName)
          + // We may want to look more closely at this:
          + // since we've cached a drm, triggering a computation
          + // it may not make sense to keep the same parallelism degree
          + if (!(parallelismDeg == _ds.getParallelism)) {
          + _ds.setParallelism(parallelismDeg).rebalance()
            • End diff –

        yes @smarthi so I will I will likely remove this check tomorrow and then push the rest as is. I guess we need to go through and make sure that all operations are setting the parallelism if they need to.

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/203#discussion_r57523491 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala — @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm [K: ClassTag:TypeInformation] (val ds: DrmDataSet [K] , override val keyClassTag: ClassTag [K] = classTag [K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } implicit val typeInformation = createTypeInformation [(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) val _ds = persist(ds, persistanceRootDir + cacheFileName) + // We may want to look more closely at this: + // since we've cached a drm, triggering a computation + // it may not make sense to keep the same parallelism degree + if (!(parallelismDeg == _ds.getParallelism)) { + _ds.setParallelism(parallelismDeg).rebalance() End diff – yes @smarthi so I will I will likely remove this check tomorrow and then push the rest as is. I guess we need to go through and make sure that all operations are setting the parallelism if they need to.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user andrewpalumbo commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/203#discussion_r57533408

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala —
        @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],

        override val keyClassTag: ClassTag[K] = classTag[K]

        + /** Note as of Flink 1.0.0, no direct flink caching exists so we save
        + * the dataset to the filesystem and read it back when cache is called */
        def cache() = {
        if (!isCached)

        { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) }
        • implicit val typeInformation = createTypeInformation[(K,Vector)]
          + val _ds = readPersistedDataSet(cacheFileName, ds)
          +
          + /** Leave the parallelism degree to be set the operators
          + * TODO: find out a way to set the parallelism degree based on the
          + * final drm after computation is actually triggered
          + *
          + * // We may want to look more closely at this:
          + * // since we've cached a drm, triggering a computation
          + * // it may not make sense to keep the same parallelism degree
          + * if (!(parallelismDeg == _ds.getParallelism)) { + * _ds.setParallelism(parallelismDeg).rebalance() + * }

          + *
          + */

        — End diff –

        commented out getParallelism setting in cache() for now. will push soon if no objections.

        Show
        githubbot ASF GitHub Bot added a comment - Github user andrewpalumbo commented on a diff in the pull request: https://github.com/apache/mahout/pull/203#discussion_r57533408 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala — @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm [K: ClassTag:TypeInformation] (val ds: DrmDataSet [K] , override val keyClassTag: ClassTag [K] = classTag [K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } implicit val typeInformation = createTypeInformation [(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) + + /** Leave the parallelism degree to be set the operators + * TODO: find out a way to set the parallelism degree based on the + * final drm after computation is actually triggered + * + * // We may want to look more closely at this: + * // since we've cached a drm, triggering a computation + * // it may not make sense to keep the same parallelism degree + * if (!(parallelismDeg == _ds.getParallelism)) { + * _ds.setParallelism(parallelismDeg).rebalance() + * } + * + */ — End diff – commented out getParallelism setting in cache() for now. will push soon if no objections.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/mahout/pull/203

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/mahout/pull/203
        Hide
        Andrew_Palumbo Andrew Palumbo added a comment -

        I merged #203 as a workaround for this. I changing the checkpoint call to throw an exception for anything other than CacheHint.DISK_ONLY or CacheHint.NONE but some of the tests must have other cache hints so I had left it as is. There's still probably room for improvement here. But this should work for now.

        As Implemented now, a checkpoint() call on a Drm will trigger a logical optimization and then create a physical Flink plan. After the plan is created, the cache call will assign a name to the Drm (if cache has not been called before) persist the backing DataSet to either the directory as specified in the taskmanager.tmp.dirs properties if set in a $MAHOUT_HOME/conf/flink-config.yaml file or to /tmp if no file exists or if the property is not set. This will trigger the evaluation of the physical plan. The backing DataSet is then re-read from the given directory, wrapped into a Drm and returned by the cache function.

        If cache has already been called on this Drm, the dataset simply reads the previously persisted DataSet from the filesystem, wraps that into a Drm and returns it.

        There is some question about how to reset the parallelism degree of the cached DataSet remaining.

        Show
        Andrew_Palumbo Andrew Palumbo added a comment - I merged #203 as a workaround for this. I changing the checkpoint call to throw an exception for anything other than CacheHint.DISK_ONLY or CacheHint.NONE but some of the tests must have other cache hints so I had left it as is. There's still probably room for improvement here. But this should work for now. As Implemented now, a checkpoint() call on a Drm will trigger a logical optimization and then create a physical Flink plan. After the plan is created, the cache call will assign a name to the Drm (if cache has not been called before) persist the backing DataSet to either the directory as specified in the taskmanager.tmp.dirs properties if set in a $MAHOUT_HOME/conf/flink-config.yaml file or to /tmp if no file exists or if the property is not set. This will trigger the evaluation of the physical plan. The backing DataSet is then re-read from the given directory, wrapped into a Drm and returned by the cache function. If cache has already been called on this Drm, the dataset simply reads the previously persisted DataSet from the filesystem, wraps that into a Drm and returns it. There is some question about how to reset the parallelism degree of the cached DataSet remaining.
        Hide
        smarthi Suneel Marthi added a comment -

        This is commendable indeed given the circumstances and the issues we had been seeing with the new backend integration. We definitely need to revisit or redo the present Caching behavior to better optimize the behavior.

        Show
        smarthi Suneel Marthi added a comment - This is commendable indeed given the circumstances and the issues we had been seeing with the new backend integration. We definitely need to revisit or redo the present Caching behavior to better optimize the behavior.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user smarthi commented on a diff in the pull request:

        https://github.com/apache/mahout/pull/203#discussion_r57661303

        — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala —
        @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],

        override val keyClassTag: ClassTag[K] = classTag[K]

        + /** Note as of Flink 1.0.0, no direct flink caching exists so we save
        + * the dataset to the filesystem and read it back when cache is called */
        def cache() = {
        if (!isCached)

        { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) }
        • implicit val typeInformation = createTypeInformation[(K,Vector)]
          + val _ds = readPersistedDataSet(cacheFileName, ds)
          +
          + /** Leave the parallelism degree to be set the operators
          + * TODO: find out a way to set the parallelism degree based on the
          + * final drm after computation is actually triggered
          + *
          + * // We may want to look more closely at this:
          + * // since we've cached a drm, triggering a computation
          + * // it may not make sense to keep the same parallelism degree
          + * if (!(parallelismDeg == _ds.getParallelism)) { + * _ds.setParallelism(parallelismDeg).rebalance() + * }

          + *
          + */

        — End diff –

        In light of MAHOUT-1819, wherein its been agreed that parallelism would only be set inside MahoutFlinkContext for the ExecutionEnvironment, I think this jira can be safely marked as 'Resolved'.

        Show
        githubbot ASF GitHub Bot added a comment - Github user smarthi commented on a diff in the pull request: https://github.com/apache/mahout/pull/203#discussion_r57661303 — Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala — @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm [K: ClassTag:TypeInformation] (val ds: DrmDataSet [K] , override val keyClassTag: ClassTag [K] = classTag [K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } implicit val typeInformation = createTypeInformation [(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) + + /** Leave the parallelism degree to be set the operators + * TODO: find out a way to set the parallelism degree based on the + * final drm after computation is actually triggered + * + * // We may want to look more closely at this: + * // since we've cached a drm, triggering a computation + * // it may not make sense to keep the same parallelism degree + * if (!(parallelismDeg == _ds.getParallelism)) { + * _ds.setParallelism(parallelismDeg).rebalance() + * } + * + */ — End diff – In light of MAHOUT-1819 , wherein its been agreed that parallelism would only be set inside MahoutFlinkContext for the ExecutionEnvironment, I think this jira can be safely marked as 'Resolved'.
        Hide
        Andrew_Palumbo Andrew Palumbo added a comment -

        Will re-visit as flink further implements caching.

        Show
        Andrew_Palumbo Andrew Palumbo added a comment - Will re-visit as flink further implements caching.
        Hide
        dlyubimov Dmitriy Lyubimov added a comment -

        bulk-closing resolved issues

        Show
        dlyubimov Dmitriy Lyubimov added a comment - bulk-closing resolved issues
        Hide
        hudson Hudson added a comment -

        FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/)
        MAHOUT-1817: optimize caching workaround for Flink, squashed commit of (apalumbo: rev b67398f933d50d3e4f00ebd7ccd57f17b96604c7)

        • flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
        • flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
        • flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
        • flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
        Show
        hudson Hudson added a comment - FAILURE: Integrated in Mahout-Quality #3324 (See https://builds.apache.org/job/Mahout-Quality/3324/ ) MAHOUT-1817 : optimize caching workaround for Flink, squashed commit of (apalumbo: rev b67398f933d50d3e4f00ebd7ccd57f17b96604c7) flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala

          People

          • Assignee:
            Andrew_Palumbo Andrew Palumbo
            Reporter:
            Andrew_Palumbo Andrew Palumbo
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development

                Agile