Uploaded image for project: 'Mahout'
  1. Mahout
  2. MAHOUT-1755

Mahout DSL for Flink: Flush intermediate results to FS

    Details

    • Type: Task
    • Status: Closed
    • Priority: Minor
    • Resolution: Implemented
    • Affects Version/s: 0.10.2
    • Fix Version/s: 0.12.0
    • Component/s: Flink
    • Labels:
      None

      Description

      Now Flink (unlike Spark) doesn't keep intermediate results in memory - therefore they should be flushed to a file system, and read back when required.

        Issue Links

          Activity

          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          but pipelines could be translated to a flink graph, right? that way somethign like rowSums() or dfsWrite() (or nrow) would truly trigger Flink graph execution. The action is deffered, right?

          Show
          dlyubimov Dmitriy Lyubimov added a comment - but pipelines could be translated to a flink graph, right? that way somethign like rowSums() or dfsWrite() (or nrow) would truly trigger Flink graph execution. The action is deffered, right?
          Hide
          agrigorev Alexey Grigorev added a comment -

          Yes each time something like rowSums is executed, a Flink job is created, executed, and the results are returned. This is kept in memory and there are no problems with that.

          Flushing is needed for checkpointing, but also when results of one calculation are needed in another, or for iterations. For example, here: https://github.com/alexeygrigorev/mahout/blob/flink-binding/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala#L109

          When I do `A = A - evdComponent`, it throws some exceptions - which, I think, should be solved if the results are flushed to FS and then re-read.

          Show
          agrigorev Alexey Grigorev added a comment - Yes each time something like rowSums is executed, a Flink job is created, executed, and the results are returned. This is kept in memory and there are no problems with that. Flushing is needed for checkpointing, but also when results of one calculation are needed in another, or for iterations. For example, here: https://github.com/alexeygrigorev/mahout/blob/flink-binding/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala#L109 When I do `A = A - evdComponent`, it throws some exceptions - which, I think, should be solved if the results are flushed to FS and then re-read.
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          Interesting. this gets back to the caching checkpoints: caching checkpoints is a way to optimize for common computational paths in DAGs.

          so the only way to implement checkpoints in Flink (other than with cacheHint = NONE) is therefore always dumping stuff to DFS? If yes, this is a serious limitation as it prevents the most effective form of caching, i.e., when the object trees themselves are used. Even if hdfs checkpoint hits memory cache, we still need to spend time serializing and deserializing partitions back for every new tiny bit of computation (such as taking mean, or sum, or reducing-rebroadcasting intermediate statistics). This loop is very common for algorithms running till convergence. If we have a heavyweight scheduling in these type of systems inside the loop (as opposed to one-time scheduling outside the loop in superstep systems), it is already bad enough. If we on top of that need to serialize and deserialize when we run 50 conversion iterations, this is pretty disastrous.

          So there's absolutely no way to keep datasets in object trees inside the worker vms between computations?

          Show
          dlyubimov Dmitriy Lyubimov added a comment - Interesting. this gets back to the caching checkpoints: caching checkpoints is a way to optimize for common computational paths in DAGs. so the only way to implement checkpoints in Flink (other than with cacheHint = NONE) is therefore always dumping stuff to DFS? If yes, this is a serious limitation as it prevents the most effective form of caching, i.e., when the object trees themselves are used. Even if hdfs checkpoint hits memory cache, we still need to spend time serializing and deserializing partitions back for every new tiny bit of computation (such as taking mean, or sum, or reducing-rebroadcasting intermediate statistics). This loop is very common for algorithms running till convergence. If we have a heavyweight scheduling in these type of systems inside the loop (as opposed to one-time scheduling outside the loop in superstep systems), it is already bad enough. If we on top of that need to serialize and deserialize when we run 50 conversion iterations, this is pretty disastrous. So there's absolutely no way to keep datasets in object trees inside the worker vms between computations?
          Hide
          Andrew_Palumbo Andrew Palumbo added a comment -

          Yes, the current workaround of persisting the drm to the filesystem slows things down considerably. We need to figure out a better way. I've opened MAHOUT-1817 for this.

          Show
          Andrew_Palumbo Andrew Palumbo added a comment - Yes, the current workaround of persisting the drm to the filesystem slows things down considerably. We need to figure out a better way. I've opened MAHOUT-1817 for this.
          Hide
          dlyubimov Dmitriy Lyubimov added a comment -

          bulk-closing resolved issues

          Show
          dlyubimov Dmitriy Lyubimov added a comment - bulk-closing resolved issues

            People

            • Assignee:
              smarthi Suneel Marthi
              Reporter:
              agrigorev Alexey Grigorev
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development

                  Agile