Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-863

Support multi-threading in samza tasks

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.11
    • Fix Version/s: 0.11
    • Component/s: None
    • Labels:
      None

      Description

      Currently a samza container executes the tasks sequentially in a single thread. For example, we have message 1 and 2 in the pending queue for task 1 and task 2. Task 1 will process message 1, and until its completion task 2 can process message 2. If we want to handle more messages in parallel, we have to increase the container count, e.g. from 1 to 2 in the example.

      While this solution has been working for many CPU-bound job scenarios, we do see its drawback for IO-bound jobs.In this kind of jobs, the task makes IO/Network requests, i.e, db calls, rest calls or external service RPC calls. These IO calls significantly slow down the task processing. We can increase container number in order to parallelize the IO calls, but it results in low CPU utilization. If we can improve CPU utilization by allocating multiple contains in the same CPU core, it will still cause dramatic memory growth due to the memory being allocated for each container.

      To better scale the performance of IO-bound jobs, we are proposing to support multi-threaded processing in samza. The design proposal will come soon.

      rbs:
      https://reviews.apache.org/r/48243/: SAMZA-961: Async tasks and multithreading model

      https://reviews.apache.org/r/48213/: SAMZA-960: Make system producer thread safe

      https://reviews.apache.org/r/48182/: SAMZA-958: Make store/cache thread safe

      1. DESIGN-SAMZA-863-0.pdf
        367 kB
        Xinyu Liu
      2. DESIGN-SAMZA-863-1.pdf
        405 kB
        Xinyu Liu
      3. DESIGN-SAMZA-863-2.pdf
        417 kB
        Xinyu Liu
      4. DESIGN-SAMZA-863-3.pdf
        417 kB
        Xinyu Liu
      5. perf-test-results.pdf
        144 kB
        Xinyu Liu
      6. SAMZA-863.0.patch
        189 kB
        Xinyu Liu
      7. SAMZA-863.1.patch
        189 kB
        Xinyu Liu

        Activity

        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        +1 on the review. Merged and submitted. Thanks!

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - +1 on the review . Merged and submitted. Thanks!
        Hide
        xinyu Xinyu Liu added a comment -

        Attached the performance testing results. Key take-away from the results:
        1) Multithreading improves the task processing rate and CPU utilization with no memory cost. Even with single-threaded thread pool for task execution, the performance improved greatly.
        2) Allowing concurrency within a single task further improves the performance, with a higher cost of CPU usage.
        3) CachedStore synchronization introduces a very small cost to performance.
        4) Running all tasks in the same thread of the new AsyncRunLoop is subpar to running in the existing RunLoop.

        Show
        xinyu Xinyu Liu added a comment - Attached the performance testing results. Key take-away from the results: 1) Multithreading improves the task processing rate and CPU utilization with no memory cost. Even with single-threaded thread pool for task execution, the performance improved greatly. 2) Allowing concurrency within a single task further improves the performance, with a higher cost of CPU usage. 3) CachedStore synchronization introduces a very small cost to performance. 4) Running all tasks in the same thread of the new AsyncRunLoop is subpar to running in the existing RunLoop.
        Hide
        xinyu Xinyu Liu added a comment -

        I've been working on the cache store synchronization so it took longer to get back to you guys. I uploaded a new version of design doc, with the feedback in the previous comments. Major changes in the design doc is in section 3.2, where I added the design for window() and callback timeout. In summary:

        • window() will support current semantics by configuration: task.window.insync=true (default). Once this is configured, the window() will be invoked only when all concurrent process() are complete, so no race condition between process() and window(). With this support, I don't see any further needs to have AsyncWindowableTask. I can create another ticket for it if we ever see the need for that.
        • callback will support timeout by configuration: task.callback.timeout.ms (default is none, so no timeout). Once this is configured, a timeout of the callback will be triggered and the container will either shutdown or ignore based on config: task.callback.timeout.op.

        Yi Pan (Data Infrastructure) Thanks for reviewing the doc. For your remaining questions:
        1) I missed the wrappedTask.init() in the sample code of ThreadedStreamTask. Thanks for pointing this out.
        2) I think the section subject caused the confusion. Inside 4.2, I renamed the ThreadedStreamTask section to be "Samza build-in multithreaded task", and the ParSeqStreamTask section to be "User-provided multithreaded task", so ThreadedStreamTask is samza build-in class (also internal), while any other AsyncStreamTask will be user implementations, such as ParSeq example. Hopefully this clarifies the purpose better.

        Show
        xinyu Xinyu Liu added a comment - I've been working on the cache store synchronization so it took longer to get back to you guys. I uploaded a new version of design doc, with the feedback in the previous comments. Major changes in the design doc is in section 3.2, where I added the design for window() and callback timeout. In summary: window() will support current semantics by configuration: task.window.insync=true (default). Once this is configured, the window() will be invoked only when all concurrent process() are complete, so no race condition between process() and window(). With this support, I don't see any further needs to have AsyncWindowableTask. I can create another ticket for it if we ever see the need for that. callback will support timeout by configuration: task.callback.timeout.ms (default is none, so no timeout). Once this is configured, a timeout of the callback will be triggered and the container will either shutdown or ignore based on config: task.callback.timeout.op. Yi Pan (Data Infrastructure) Thanks for reviewing the doc. For your remaining questions: 1) I missed the wrappedTask.init() in the sample code of ThreadedStreamTask. Thanks for pointing this out. 2) I think the section subject caused the confusion. Inside 4.2, I renamed the ThreadedStreamTask section to be "Samza build-in multithreaded task", and the ParSeqStreamTask section to be "User-provided multithreaded task", so ThreadedStreamTask is samza build-in class (also internal), while any other AsyncStreamTask will be user implementations, such as ParSeq example. Hopefully this clarifies the purpose better.
        Hide
        nickpan47 Yi Pan (Data Infrastructure) added a comment -

        Xinyu Liu, thanks for the design doc and sorry for the late review. It looks great to me. I have just a few additional points, given that Chris Pettitt and you have already discussed a lot:

        1. What’s the sequence of wrappedTask.init() and ThreadedStreamTask.init()? I assume that ThreadedStreamTask is not public API which we will control the implementation of its init()?
        2. What’s the user code vs what’s Samza framework code? Is the implementation of ThreadedStreamTask and ParSeqStreamTask being “reference implementation” of user code? Or the interface AsyncStreamTask is more of SPI instead of API and framework developer should implement it as a service/lib to be used by application developers?
        3. As for the callbacks not triggered issue, I think that we should implement the timeout mechanism in the TaskCallback. Depending on whether the implementation enforces in-order execution of the callbacks, the failure of the first callback may or may not trigger the failure of all pending callbacks. This should be implemented in SamzaContainer or TaskInstance class as well.
        4. For the race condition between window and process as Chris Pettitt pointed out, I think to retain the current semantic may be critical for existing users. If user choose to implement WindowableTask interface, we should make sure that all pending process() are done before we invoke user implemented window() method. With the assumption that a) the window() method is not invoked often; b) timeout on callbacks will not block window() invocation forever, I think that would be a reasonable solution. Also, do we see any need for AsyncWindowableTask? We probably can mentioned it as "extra features" not in the first design.

        Thanks!

        Show
        nickpan47 Yi Pan (Data Infrastructure) added a comment - Xinyu Liu , thanks for the design doc and sorry for the late review. It looks great to me. I have just a few additional points, given that Chris Pettitt and you have already discussed a lot: What’s the sequence of wrappedTask.init() and ThreadedStreamTask.init()? I assume that ThreadedStreamTask is not public API which we will control the implementation of its init()? What’s the user code vs what’s Samza framework code? Is the implementation of ThreadedStreamTask and ParSeqStreamTask being “reference implementation” of user code? Or the interface AsyncStreamTask is more of SPI instead of API and framework developer should implement it as a service/lib to be used by application developers? As for the callbacks not triggered issue, I think that we should implement the timeout mechanism in the TaskCallback. Depending on whether the implementation enforces in-order execution of the callbacks, the failure of the first callback may or may not trigger the failure of all pending callbacks. This should be implemented in SamzaContainer or TaskInstance class as well. For the race condition between window and process as Chris Pettitt pointed out, I think to retain the current semantic may be critical for existing users. If user choose to implement WindowableTask interface, we should make sure that all pending process() are done before we invoke user implemented window() method. With the assumption that a) the window() method is not invoked often; b) timeout on callbacks will not block window() invocation forever, I think that would be a reasonable solution. Also, do we see any need for AsyncWindowableTask? We probably can mentioned it as "extra features" not in the first design. Thanks!
        Hide
        cpettitt-linkedin Chris Pettitt added a comment -

        Thanks Xinyu Liu. Your comments above clarified something for me, which we do not currently support multi-tenancy in Samza and that it is totally out of scope for this proposal.

        I'm still getting grounded in Samza and its use cases, so take all of my suggestions with a grain of salt - I'd certainly be interested in input from others with more Samza experience.

        Regarding window(), it seems reasonable to be able to configure invocation of window() and process() to be mutually exclusive. For reasons of responsiveness you might want to schedule window with higher priority than process (i.e. don't wait for quiescence before calling window). That is just a hunch.

        For the case where TaskCallback is never invoked: since we're not yet dealing with multi-tenancy we only need to understand the impact on a single task (type) for a single owner (I assume). At the very least we should have someway to alert if execution stalls (this would be a good idea independent of this change). Let's say we're going to timeout the operation - what should be the final result? Should we a) shutdown the task altogether, b) retry the task, c) ignore and continue? a) seems safest in absence of other constraints. b) would be possible if we knew the retry were idempotent (probably difficult to guarantee this, though), and c) would only be reasonable if the task owner was willing to accept less accurate information. All three could fail again immediately (e.g. due to a remote call that is taking too long to process). A totally different way to look at this (again, since we're not dealing with multi-tenancy) is that d) we just allow the operation to continue until it finishes. A few more things to consider: remote calls made with LinkedIn tech all have an upper bound on how long they will wait before timing out, and we encourage users of ParSeq where appropriate to add app-specific timeouts. IIRC, we do not have an upper bound on the time a ParSeq plan takes sans application-specific timeouts and this has worked out reasonably in most cases (we also don't hold a thread and aggressively release references to memory). Until we have to deal with multi-tenancy and resource management, my intuition would be to move towards d), but we need good instrumentation on overall latency and latency for pending requests.

        Show
        cpettitt-linkedin Chris Pettitt added a comment - Thanks Xinyu Liu . Your comments above clarified something for me, which we do not currently support multi-tenancy in Samza and that it is totally out of scope for this proposal. I'm still getting grounded in Samza and its use cases, so take all of my suggestions with a grain of salt - I'd certainly be interested in input from others with more Samza experience. Regarding window(), it seems reasonable to be able to configure invocation of window() and process() to be mutually exclusive. For reasons of responsiveness you might want to schedule window with higher priority than process (i.e. don't wait for quiescence before calling window). That is just a hunch. For the case where TaskCallback is never invoked: since we're not yet dealing with multi-tenancy we only need to understand the impact on a single task (type) for a single owner (I assume). At the very least we should have someway to alert if execution stalls (this would be a good idea independent of this change). Let's say we're going to timeout the operation - what should be the final result? Should we a) shutdown the task altogether, b) retry the task, c) ignore and continue? a) seems safest in absence of other constraints. b) would be possible if we knew the retry were idempotent (probably difficult to guarantee this, though), and c) would only be reasonable if the task owner was willing to accept less accurate information. All three could fail again immediately (e.g. due to a remote call that is taking too long to process). A totally different way to look at this (again, since we're not dealing with multi-tenancy) is that d) we just allow the operation to continue until it finishes. A few more things to consider: remote calls made with LinkedIn tech all have an upper bound on how long they will wait before timing out, and we encourage users of ParSeq where appropriate to add app-specific timeouts. IIRC, we do not have an upper bound on the time a ParSeq plan takes sans application-specific timeouts and this has worked out reasonably in most cases (we also don't hold a thread and aggressively release references to memory). Until we have to deal with multi-tenancy and resource management, my intuition would be to move towards d), but we need good instrumentation on overall latency and latency for pending requests.
        Hide
        xinyu Xinyu Liu added a comment -

        Chris Pettitt: great feedback! These issues are not very clear in the design doc and need to be answered in the doc. Your last question is exactly what I've been thinking for a while. In the design doc, window() is handled the same as before, no matter whether there is in-flight message processing at the same time. This will leave the user to handle race conditions if window is relying on the process results. I've been thinking about supporting calling window() after all the current in-flight message being processed. This behavior can be controlled using config, say if user configures task.window.concurrent.call=false, then the container will wait till all in-flight message processed. This solution does provide the same semantics as before (at some performance cost), and the user can config either way for their use cases. What do you think?

        For your other questions:

        • I assume the executorService retrieved using SamzaTaskExecutor.getInstance(config) is shared across ThreadedStreamTasks?
          Yes, it's a singleton inside a samza container.
        • I assume we're not exposing this executor to the user for direct use, but only use it from the StreamTask wrapper (ThreadedStreamTask)?
          Yes, ThreadedStreamTask is a build-in samza async task, and we are not exposing its thread pool to the user.
        • It could be possible to have StreamTasks and AsyncStreamTasks in the same container (yes?), so we should only need to use the wrapper when both task instanceof StreamTask and job.container.threadcount > 1 hold.
          No, all samza tasks are the same type. Samza doesn't support heterogeneous task types.
        • Probably samza already discourages using state from outside of the task / arguments provided to the task, but if not we should make this clear. Even with synchronous tasks there is the potential for multi-threading bugs if more than one thread are used.
          Right, I will add this to the doc.
        • Is the idea of running multiple tasks in one container novel (it seems not to be). If it is, do we need isolation at the classloader level?
          No, current samza container is already running multiple tasks (of same type).
        • How do we handle the case where the TaskCallback is never invoked?
          Very good question! When I started the design, I was thinking about having a timeout inside the callback impl so we can catch this issue and reported to the callback listener. What do you think?
        • I assume that if we have > task.process.max.inflight.messages for a given task that we stop polling on associated topics? If not, how to we handle receiving messages faster than they can be processed?
          Yes, the container will pause on polling further messages and resume once there is any task is available again.
        Show
        xinyu Xinyu Liu added a comment - Chris Pettitt : great feedback! These issues are not very clear in the design doc and need to be answered in the doc. Your last question is exactly what I've been thinking for a while. In the design doc, window() is handled the same as before, no matter whether there is in-flight message processing at the same time. This will leave the user to handle race conditions if window is relying on the process results. I've been thinking about supporting calling window() after all the current in-flight message being processed. This behavior can be controlled using config, say if user configures task.window.concurrent.call=false, then the container will wait till all in-flight message processed. This solution does provide the same semantics as before (at some performance cost), and the user can config either way for their use cases. What do you think? For your other questions: I assume the executorService retrieved using SamzaTaskExecutor.getInstance(config) is shared across ThreadedStreamTasks? Yes, it's a singleton inside a samza container. I assume we're not exposing this executor to the user for direct use, but only use it from the StreamTask wrapper (ThreadedStreamTask)? Yes, ThreadedStreamTask is a build-in samza async task, and we are not exposing its thread pool to the user. It could be possible to have StreamTasks and AsyncStreamTasks in the same container (yes?), so we should only need to use the wrapper when both task instanceof StreamTask and job.container.threadcount > 1 hold. No, all samza tasks are the same type. Samza doesn't support heterogeneous task types. Probably samza already discourages using state from outside of the task / arguments provided to the task, but if not we should make this clear. Even with synchronous tasks there is the potential for multi-threading bugs if more than one thread are used. Right, I will add this to the doc. Is the idea of running multiple tasks in one container novel (it seems not to be). If it is, do we need isolation at the classloader level? No, current samza container is already running multiple tasks (of same type). How do we handle the case where the TaskCallback is never invoked? Very good question! When I started the design, I was thinking about having a timeout inside the callback impl so we can catch this issue and reported to the callback listener. What do you think? I assume that if we have > task.process.max.inflight.messages for a given task that we stop polling on associated topics? If not, how to we handle receiving messages faster than they can be processed? Yes, the container will pause on polling further messages and resume once there is any task is available again.
        Hide
        cpettitt-linkedin Chris Pettitt added a comment -

        Taking another look at my notes, I think most of the remaining items are opportunities for clarification in the design or are due to missing Samza context:

        • I assume the executorService retrieved using SamzaTaskExecutor.getInstance(config) is shared across ThreadedStreamTasks?
        • I assume we're not exposing this executor to the user for direct use, but only use it from the StreamTask wrapper (ThreadedStreamTask)?
        • It could be possible to have StreamTasks and AsyncStreamTasks in the same container (yes?), so we should only need to use the wrapper when both task instanceof StreamTask and job.container.threadcount > 1 hold.
        • Probably samza already discourages using state from outside of the task / arguments provided to the task, but if not we should make this clear. Even with synchronous tasks there is the potential for multi-threading bugs if more than one thread are used.
        • Is the idea of running multiple tasks in one container novel (it seems not to be). If it is, do we need isolation at the classloader level?
        • How do we handle the case where the TaskCallback is never invoked?
        • I assume that if we have > task.process.max.inflight.messages for a given task that we stop polling on associated topics? If not, how to we handle receiving messages faster than they can be processed?
        • The status quo diagram in section 3.1 seems to imply that the thread that calls process subsequently calls window (if necessary) before calling commit? Am I reading that correctly? If so, and if the diagram in 3.2, Option 2 is correct (with invocation of the callback calling commit directly), is it possible for a data races between process and window for customers who are relying on the current behavior?
        Show
        cpettitt-linkedin Chris Pettitt added a comment - Taking another look at my notes, I think most of the remaining items are opportunities for clarification in the design or are due to missing Samza context: I assume the executorService retrieved using SamzaTaskExecutor.getInstance(config) is shared across ThreadedStreamTasks? I assume we're not exposing this executor to the user for direct use, but only use it from the StreamTask wrapper (ThreadedStreamTask)? It could be possible to have StreamTasks and AsyncStreamTasks in the same container (yes?), so we should only need to use the wrapper when both task instanceof StreamTask and job.container.threadcount > 1 hold. Probably samza already discourages using state from outside of the task / arguments provided to the task, but if not we should make this clear. Even with synchronous tasks there is the potential for multi-threading bugs if more than one thread are used. Is the idea of running multiple tasks in one container novel (it seems not to be). If it is, do we need isolation at the classloader level? How do we handle the case where the TaskCallback is never invoked? I assume that if we have > task.process.max.inflight.messages for a given task that we stop polling on associated topics? If not, how to we handle receiving messages faster than they can be processed? The status quo diagram in section 3.1 seems to imply that the thread that calls process subsequently calls window (if necessary) before calling commit? Am I reading that correctly? If so, and if the diagram in 3.2, Option 2 is correct (with invocation of the callback calling commit directly), is it possible for a data races between process and window for customers who are relying on the current behavior?
        Hide
        cpettitt-linkedin Chris Pettitt added a comment -

        Here are a few smaller suggestions:

        • Section 1: "where we have message 1 and 2 in the pending queue for task 1 and task2". I would reword this to something like: "where we have message 1 in the pending queue for task 1 and message 2 in the pending queue for task 2". I believe this more closely matches the intent based on the following sentence.
        • Section 1: "Task 1 will process message 1, and until its completion task 2 can*not* process message 2".
        • Section 2.2: Advantage: Proposed rewording: "It allows parallel processing of messages in a single task."
        • Section 2.2: Limitation: It also introduces a potential need to coordinate state (either in memory, or "local persisted"), regardless of ordering dependencies.

        I have some larger potential issues to add later today, but I'm out of time and want to get you some easy stuff in the meantime.

        Show
        cpettitt-linkedin Chris Pettitt added a comment - Here are a few smaller suggestions: Section 1: "where we have message 1 and 2 in the pending queue for task 1 and task2". I would reword this to something like: "where we have message 1 in the pending queue for task 1 and message 2 in the pending queue for task 2". I believe this more closely matches the intent based on the following sentence. Section 1: "Task 1 will process message 1, and until its completion task 2 can*not* process message 2". Section 2.2: Advantage: Proposed rewording: "It allows parallel processing of messages in a single task." Section 2.2: Limitation: It also introduces a potential need to coordinate state (either in memory, or "local persisted"), regardless of ordering dependencies. I have some larger potential issues to add later today, but I'm out of time and want to get you some easy stuff in the meantime.
        Hide
        xinyu Xinyu Liu added a comment -

        Update the design proposal according to Chris's comments. I changed the terminologies in the use cases to clarify different kinds of parallel processing, and made a more detailed figure for option 2 in section 3.2, which shows how the callback-based model works. Please chime in your feedback.

        Show
        xinyu Xinyu Liu added a comment - Update the design proposal according to Chris's comments. I changed the terminologies in the use cases to clarify different kinds of parallel processing, and made a more detailed figure for option 2 in section 3.2, which shows how the callback-based model works. Please chime in your feedback.
        Hide
        xinyu Xinyu Liu added a comment -

        @Chris: Thanks a lot for catching the issues in the design doc! To your points:

        For 1), right, there should be only one parseq engine for the container. I think the code I put there is misleading, which looks like each task is building its own engine. It should be a singleton inside the samza container. I will update the design doc to make this clear.

        For 2), the design is to let the user run their own underlying executor, while Samza will do the callback handling. So for AsyncStream task running on Parseq, we should be able to get all the benefits from it. I will read more about the visualization doc and see how it can be used in Samza.

        For 3), you're exactly right. I should catch Throwable instead of Exception. I will update the design doc for the fix too.

        Show
        xinyu Xinyu Liu added a comment - @Chris: Thanks a lot for catching the issues in the design doc! To your points: For 1), right, there should be only one parseq engine for the container. I think the code I put there is misleading, which looks like each task is building its own engine. It should be a singleton inside the samza container. I will update the design doc to make this clear. For 2), the design is to let the user run their own underlying executor, while Samza will do the callback handling. So for AsyncStream task running on Parseq, we should be able to get all the benefits from it. I will read more about the visualization doc and see how it can be used in Samza. For 3), you're exactly right. I should catch Throwable instead of Exception. I will update the design doc for the fix too.
        Hide
        cpettitt-linkedin Chris Pettitt added a comment -

        I like the design. A callback-based approach makes sense as the lowest level primitive and is easy use as a bridge to other libraries, such as ParSeq.

        Here are a few initial thoughts:

        AsyncStreamTask doesn't seem like the right place to construct a ParSeq engine. The reason is that ParSeq is designed to manage tasks across cores on the system. If each task is running its own engine you need to start thinking about what tasks might be co-located in the same container / host when configuring ParSeq. Instead it is preferable to create a single engine as a part of container startup.

        It might be worth considering using ParSeq as the underlying executor or at least steal some of the best ideas from it. For example, ParSeq's visualization has turned out to be extremely useful in tracking down production issues (https://github.com/linkedin/parseq/wiki/Tracing#graphviz-view). Another interesting idea is that of task fusion. In ParSeq we're often fanning out requests and then trying to join the results, which results in one or more stages of sequential computation. A naive approach would be to reschedule each stage of computation, but we lose out on cache locality and add some scheduling overhead. Instead you can detect that the work is synchronous and some set of those stages immediately before rescheduling the plan. This becomes even interesting in the context of larger workflows where we tie together a graph of "simple" transforms and decide to run some subset of the graph with fusion. One other consideration that may or may not help is that you do not need to worry about multi-threading in the context of a ParSeq plan - each task "happens before" subsequent tasks in the plan, even if they are executed on different cores.

        A minor comment on ​ThreadedStreamTask: you might want to catch throwable there (especially if you're going to shutdown the job / container on failure). Last time I looked JUnit and the like use Errors (vs. Exceptions), so if something goes wrong the thread will die and you won't get a signal as to what happened.

        Show
        cpettitt-linkedin Chris Pettitt added a comment - I like the design. A callback-based approach makes sense as the lowest level primitive and is easy use as a bridge to other libraries, such as ParSeq. Here are a few initial thoughts: AsyncStreamTask doesn't seem like the right place to construct a ParSeq engine. The reason is that ParSeq is designed to manage tasks across cores on the system. If each task is running its own engine you need to start thinking about what tasks might be co-located in the same container / host when configuring ParSeq. Instead it is preferable to create a single engine as a part of container startup. It might be worth considering using ParSeq as the underlying executor or at least steal some of the best ideas from it. For example, ParSeq's visualization has turned out to be extremely useful in tracking down production issues ( https://github.com/linkedin/parseq/wiki/Tracing#graphviz-view ). Another interesting idea is that of task fusion. In ParSeq we're often fanning out requests and then trying to join the results, which results in one or more stages of sequential computation. A naive approach would be to reschedule each stage of computation, but we lose out on cache locality and add some scheduling overhead. Instead you can detect that the work is synchronous and some set of those stages immediately before rescheduling the plan. This becomes even interesting in the context of larger workflows where we tie together a graph of "simple" transforms and decide to run some subset of the graph with fusion. One other consideration that may or may not help is that you do not need to worry about multi-threading in the context of a ParSeq plan - each task "happens before" subsequent tasks in the plan, even if they are executed on different cores. A minor comment on ​ThreadedStreamTask: you might want to catch throwable there (especially if you're going to shutdown the job / container on failure). Last time I looked JUnit and the like use Errors (vs. Exceptions), so if something goes wrong the thread will die and you won't get a signal as to what happened.
        Hide
        xinyu Xinyu Liu added a comment -

        Initial design proposal attached.

        Show
        xinyu Xinyu Liu added a comment - Initial design proposal attached.

          People

          • Assignee:
            xinyu Xinyu Liu
            Reporter:
            xinyu Xinyu Liu
          • Votes:
            1 Vote for this issue
            Watchers:
            10 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development