Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.5.0
    • Fix Version/s: 0.6
    • Labels:
      None

      Description

      Currently, each Stream has one Thread in charge of processing the incoming events on the appropriate PE. If one PE blocks it's execution while processing an event, the whole Stream would be blocked. The current solution is for a PE to manage his own async thread, which I don't think it's nice.

      It would be better to have a configurable number of threads that would take care of the execution of the incoming events.

      1. S4-62-multithreaded-streams.patch
        16 kB
        Daniel Gómez Ferro
      2. S4-62-multithreaded-streams.patch
        20 kB
        Daniel Gómez Ferro

        Issue Links

          Activity

          Hide
          Daniel Gómez Ferro added a comment -

          This patch introduces an ExecutorService per Stream with a configurable maximum number of threads (default is 1).

          A few tests are included.

          Show
          Daniel Gómez Ferro added a comment - This patch introduces an ExecutorService per Stream with a configurable maximum number of threads (default is 1). A few tests are included.
          Hide
          Flavio Junqueira added a comment -

          When you say that a PE blocks upon an event, are you referring to waiting on an I/O or something similar, or simply being CPU intensive. In general, I would think that network is the primary bottleneck, but I'm not really sure what it is for S4. Has anyone verified it?

          Having PEs that block upon an event does not sound like a good idea, but it would be interesting to see a use case if you or anyone else is aware of any.

          Show
          Flavio Junqueira added a comment - When you say that a PE blocks upon an event, are you referring to waiting on an I/O or something similar, or simply being CPU intensive. In general, I would think that network is the primary bottleneck, but I'm not really sure what it is for S4. Has anyone verified it? Having PEs that block upon an event does not sound like a good idea, but it would be interesting to see a use case if you or anyone else is aware of any.
          Hide
          Daniel Gómez Ferro added a comment -

          My original use case has to do with waiting for I/O. Suppose you are receiving raw URLs and you want to resolve them (in case they are shortened). You have to either resolve them from your PE, use a separate service or query a database, all of them blocking your pipeline.

          As I said, the only alternative I could think of was starting an async Thread from the PE to do this task, but I think this solution is much better.

          If anyone knows a different solution please share. Maybe S4 is not the right tool for the job, but overall I feel it matches nicely.

          Show
          Daniel Gómez Ferro added a comment - My original use case has to do with waiting for I/O. Suppose you are receiving raw URLs and you want to resolve them (in case they are shortened). You have to either resolve them from your PE, use a separate service or query a database, all of them blocking your pipeline. As I said, the only alternative I could think of was starting an async Thread from the PE to do this task, but I think this solution is much better. If anyone knows a different solution please share. Maybe S4 is not the right tool for the job, but overall I feel it matches nicely.
          Hide
          Matthieu Morel added a comment -

          Ideally, there should not be any blocking operations in PEs. However, this only applies if you don't need any external data, or that you can load all external data in memory upon initialization. In some cases, this is not possible, and I/O will be needed. Retrieved data can then be cached. It's much better if the stream is not blocked (provided you don't need ordering), and we certainly don't want to let PEs manage threads internally (that would mess the lifecycle management of PEs).

          In addition, parallelizing the processing of streams may also help to increase throughput, and it is something we had been contemplating, but that we preferred to focus on in a later release. Daniel's proposal is a good start, with reasonable defaults, and remains optional. We could later improve this mechanism and provide more flexibility / performance.

          Show
          Matthieu Morel added a comment - Ideally, there should not be any blocking operations in PEs. However, this only applies if you don't need any external data, or that you can load all external data in memory upon initialization. In some cases, this is not possible, and I/O will be needed. Retrieved data can then be cached. It's much better if the stream is not blocked (provided you don't need ordering), and we certainly don't want to let PEs manage threads internally (that would mess the lifecycle management of PEs). In addition, parallelizing the processing of streams may also help to increase throughput, and it is something we had been contemplating, but that we preferred to focus on in a later release. Daniel's proposal is a good start, with reasonable defaults, and remains optional. We could later improve this mechanism and provide more flexibility / performance.
          Hide
          Daniel Gómez Ferro added a comment -

          I'm updating the tests to reuse the classes introduced in S4-63, which is more likely to get integrated soon.

          Show
          Daniel Gómez Ferro added a comment - I'm updating the tests to reuse the classes introduced in S4-63 , which is more likely to get integrated soon.
          Hide
          Daniel Gómez Ferro added a comment -

          The tests depend on the classes introduced on that issue.

          Show
          Daniel Gómez Ferro added a comment - The tests depend on the classes introduced on that issue.
          Hide
          Flavio Junqueira added a comment -

          I just had a chat with Daniel about this. My first reaction here was to think that PEs shouldn't be blocking on I/O because you want to keep the processing of streams flowing. After seeing the use case, I realized that it is valid one and there could be more like that.

          I was then wondering what the right way of dealing with such cases is. One high-level idea is to move the processing of I/O requests out of the critical path so that we can keep processing events. We could have a thread pool on the side just to deal with such requests, but this would be effective only if not all events require blocking on I/O.

          If anyone has more thoughts on how to deal with such cases, I would be interested in hearing. This discussion goes beyond one vs. multiple threads.

          Show
          Flavio Junqueira added a comment - I just had a chat with Daniel about this. My first reaction here was to think that PEs shouldn't be blocking on I/O because you want to keep the processing of streams flowing. After seeing the use case, I realized that it is valid one and there could be more like that. I was then wondering what the right way of dealing with such cases is. One high-level idea is to move the processing of I/O requests out of the critical path so that we can keep processing events. We could have a thread pool on the side just to deal with such requests, but this would be effective only if not all events require blocking on I/O. If anyone has more thoughts on how to deal with such cases, I would be interested in hearing. This discussion goes beyond one vs. multiple threads.
          Hide
          Daniel Gómez Ferro added a comment -

          We could have a thread pool on the side just to deal with such requests, but this would be effective only if not all events require blocking on I/O.

          Matthieu and I also discussed this and realized that it would be equivalent to having a PE on the side doing such requests. You could split the incoming stream into one going to a PE doing blocking requests and another stream to a non-blocking PE, effectively moving the I/O out of the critical path.

          If anyone has more thoughts on how to deal with such cases, I would be interested in hearing. This discussion goes beyond one vs. multiple threads.

          Yes, it would be really nice to know if anybody has run into this kind of issue while developing S4 applications. How did you avoid it?

          Show
          Daniel Gómez Ferro added a comment - We could have a thread pool on the side just to deal with such requests, but this would be effective only if not all events require blocking on I/O. Matthieu and I also discussed this and realized that it would be equivalent to having a PE on the side doing such requests. You could split the incoming stream into one going to a PE doing blocking requests and another stream to a non-blocking PE, effectively moving the I/O out of the critical path. If anyone has more thoughts on how to deal with such cases, I would be interested in hearing. This discussion goes beyond one vs. multiple threads. Yes, it would be really nice to know if anybody has run into this kind of issue while developing S4 applications. How did you avoid it?
          Hide
          Matthieu Morel added a comment -

          Matt Welsh's feedback on SEDA ( http://matt-welsh.blogspot.com/2010/07/retrospective-on-seda.html ) could be taken into account. S4 follows a similar stage-driven architecture. Whereas it might be difficult to scale out by grouping stages into a single "thread pool domain" as suggested in the blog post, an interesting recommendation is to put "a separate thread pool and queue in front of a group of stages that have long latency or nondeterministic runtime, such as performing disk I/O." This corresponds to the proposal of this ticket.

          The platform could even provide this feature automatically when streams or PEs are identified as I/O consumers.

          Show
          Matthieu Morel added a comment - Matt Welsh's feedback on SEDA ( http://matt-welsh.blogspot.com/2010/07/retrospective-on-seda.html ) could be taken into account. S4 follows a similar stage-driven architecture. Whereas it might be difficult to scale out by grouping stages into a single "thread pool domain" as suggested in the blog post, an interesting recommendation is to put " a separate thread pool and queue in front of a group of stages that have long latency or nondeterministic runtime, such as performing disk I/O. " This corresponds to the proposal of this ticket. The platform could even provide this feature automatically when streams or PEs are identified as I/O consumers.
          Hide
          Flavio Junqueira added a comment -

          In ZooKeeper we do pipelining too, and it is essentially the stage-driven approach you're referring to. The request processors would be the stages.

          I still feel that the case here is somewhat different. For ZooKeeper and I believe SEDA too, we throttle when load is high, which halts the submission of new requests until the load drops.

          We can't stop streams. If we throttle, then the end result will be dropping events, perhaps many of them. Consequently, it does not sound great to have blocking I/O operations in the critical path. It is unclear to me if adding thread pools would help because depending on the arrival rate and the service rate, we may end up saturating the pool very fast.

          Show
          Flavio Junqueira added a comment - In ZooKeeper we do pipelining too, and it is essentially the stage-driven approach you're referring to. The request processors would be the stages. I still feel that the case here is somewhat different. For ZooKeeper and I believe SEDA too, we throttle when load is high, which halts the submission of new requests until the load drops. We can't stop streams. If we throttle, then the end result will be dropping events, perhaps many of them. Consequently, it does not sound great to have blocking I/O operations in the critical path. It is unclear to me if adding thread pools would help because depending on the arrival rate and the service rate, we may end up saturating the pool very fast.
          Hide
          Matthieu Morel added a comment -

          S4-95 enables this approach in an optional fashion, which is set in the application layer.

          Concretely, the API allows to specify the parallelism of a given stream.

          Depending on the underlying executor implementation, those threads may be created when work queue fills up, and may timeout when unused. And that can be customized/overriden.

          Show
          Matthieu Morel added a comment - S4-95 enables this approach in an optional fashion, which is set in the application layer. Concretely, the API allows to specify the parallelism of a given stream. Depending on the underlying executor implementation, those threads may be created when work queue fills up, and may timeout when unused. And that can be customized/overriden.
          Hide
          Matthieu Morel added a comment -

          Implemented by S4-95. Parallelism can be specified in App and implementation is pluggable as a StreamExecutorServiceFactory

          Show
          Matthieu Morel added a comment - Implemented by S4-95 . Parallelism can be specified in App and implementation is pluggable as a StreamExecutorServiceFactory

            People

            • Assignee:
              Unassigned
              Reporter:
              Daniel Gómez Ferro
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development