Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-245

Improve SystemConsumers performance

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.8.0
    • Component/s: container
    • Labels:
      None

      Description

      As part of SAMZA-220, a more radical patch was proposed. This patch appears to improve SystemConsumers' performance pretty significantly, while also reducing its complexity. The decision was made to move this change into the 0.8.0 release, rather than the 0.7.0 release, since it's a fairly risky change.

      This ticket is to explore updating SystemConsumers to eliminate almost all loops in order to increase performance in the Samza container.

      1. org.apache.samza.test.performance.TestSamzaContainerPerformance.master.html
        1.58 MB
        Chris Riccomini
      2. org.apache.samza.test.performance.TestSamzaContainerPerformance.SAMZA-245-3.html
        1.58 MB
        Chris Riccomini
      3. SAMZA-245.0.patch
        26 kB
        Chris Riccomini
      4. SAMZA-245-1.patch
        58 kB
        Chris Riccomini
      5. SAMZA-245-3.patch
        61 kB
        Chris Riccomini
      6. SAMZA-245-4.patch
        70 kB
        Chris Riccomini
      7. SAMZA-245-5.patch
        77 kB
        Chris Riccomini
      8. SAMZA-245-6.patch
        77 kB
        Chris Riccomini
      9. SAMZA-245-7.patch
        77 kB
        Chris Riccomini

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment - - edited

          Attaching initial patch. RB is up ate:

          https://reviews.apache.org/r/23588/

          1. Change the API for SystemConsumer to return messages grouped by SSP.
          2. Change the API for SystemConsumer to eliminate a max fetch size.
          3. Update SystemStreamPartitionIterator to use new API defined in (1) above.
          4. Update BlockingEnvelopeMap to use the new API described in (1) above.
          5. Switch SystemConsumers to use all java.util data structures.
          6. Eliminate all fetch thresholds, max buffer size, etc, since we don't need them due to (2) above.
          7. Eliminate neededByChooser and skippingChooser variables, and introduce a map to track empty SystemStreamPartitions grouped by system. The grouping is due to (2) above.
          8. Introduce a threshold in SystemConsumers.choose to trigger poll'ing, so we don't poll all system consumers on every choose invocation. Currently the threshold is hard-coded to 1000.
          9. Eliminate doubling backoff, since we are already throttling how often we poll using the threshold in (8).
          10. Use an iterator instead of Scala for loops to speed things up.
          11. Update MessageChooser whenever it picks a message, and also whenever a message arrives for a previously empty SSP.
          12. Strip RoundRobinChooser of its safety, because it was taking 14% of CPU in Scala's + set method.

          I think that these set of changes maintain the characteristics for the MessageChooser, namely, one SSP update at a time. I ran some preliminary tests, and it didn't seem to receive any duplicates. The trick is that we only add the SSP to the empty map after we've checked it once in the choose method. This is normally the point where we'd add it to the skippingChooser method, and remove it from the neededByChooser method. Instead, we just add it to the empty buffer, and never try to update again until a new envelope arrives for the SSP in question.

          Obviously I need to update the tests and write some more to verify that all of our logic still holds.

          Show
          criccomini Chris Riccomini added a comment - - edited Attaching initial patch. RB is up ate: https://reviews.apache.org/r/23588/ 1. Change the API for SystemConsumer to return messages grouped by SSP. 2. Change the API for SystemConsumer to eliminate a max fetch size. 3. Update SystemStreamPartitionIterator to use new API defined in (1) above. 4. Update BlockingEnvelopeMap to use the new API described in (1) above. 5. Switch SystemConsumers to use all java.util data structures. 6. Eliminate all fetch thresholds, max buffer size, etc, since we don't need them due to (2) above. 7. Eliminate neededByChooser and skippingChooser variables, and introduce a map to track empty SystemStreamPartitions grouped by system. The grouping is due to (2) above. 8. Introduce a threshold in SystemConsumers.choose to trigger poll'ing, so we don't poll all system consumers on every choose invocation. Currently the threshold is hard-coded to 1000. 9. Eliminate doubling backoff, since we are already throttling how often we poll using the threshold in (8). 10. Use an iterator instead of Scala for loops to speed things up. 11. Update MessageChooser whenever it picks a message, and also whenever a message arrives for a previously empty SSP. 12. Strip RoundRobinChooser of its safety, because it was taking 14% of CPU in Scala's + set method. I think that these set of changes maintain the characteristics for the MessageChooser, namely, one SSP update at a time. I ran some preliminary tests, and it didn't seem to receive any duplicates. The trick is that we only add the SSP to the empty map after we've checked it once in the choose method. This is normally the point where we'd add it to the skippingChooser method, and remove it from the neededByChooser method. Instead, we just add it to the empty buffer, and never try to update again until a new envelope arrives for the SSP in question. Obviously I need to update the tests and write some more to verify that all of our logic still holds.
          Hide
          criccomini Chris Riccomini added a comment -

          Taking up this patch. Starting with a rebase, since code has changed considerably since the last time this patch was applied.

          Please have a look and provide feedback on the work that's being done. Some of these changes are pretty big (and API incompatible).

          Show
          criccomini Chris Riccomini added a comment - Taking up this patch. Starting with a rebase, since code has changed considerably since the last time this patch was applied. Please have a look and provide feedback on the work that's being done. Some of these changes are pretty big (and API incompatible).
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching re-based patch. All tests pass. RB is available at:

          https://reviews.apache.org/r/23588/

          1. Rebased original patch.
          2. Fixed tests to compile and pass again.
          3. Found a bug in original patch, where dropping deserialization messages could lead to never consuming from the stream again, so fixed that.

          One thing to discuss here is the refreshThreshold. We need a way to trigger polling, so I introduced this, which is a global lower bound that defines when we'll start polling systems for more messages. The problem is that this concept conflicts with the TieredPriorityChooser (see SAMZA-342), where we might want to consume real time messages immediately, even while processing batch messages. In such a case, with this patch, the batch messages would fill the buffer, and cause the real time streams not to be polled until all of the batch messages are processed. One work around for this would be to raise refreshThreshold to a very high number, so that you're always polling, but perhaps there's a better solution? Maybe stream-specific polling thresholds, or something.

          Show
          criccomini Chris Riccomini added a comment - Attaching re-based patch. All tests pass. RB is available at: https://reviews.apache.org/r/23588/ Rebased original patch. Fixed tests to compile and pass again. Found a bug in original patch, where dropping deserialization messages could lead to never consuming from the stream again, so fixed that. One thing to discuss here is the refreshThreshold. We need a way to trigger polling, so I introduced this, which is a global lower bound that defines when we'll start polling systems for more messages. The problem is that this concept conflicts with the TieredPriorityChooser (see SAMZA-342 ), where we might want to consume real time messages immediately, even while processing batch messages. In such a case, with this patch, the batch messages would fill the buffer, and cause the real time streams not to be polled until all of the batch messages are processed. One work around for this would be to raise refreshThreshold to a very high number, so that you're always polling, but perhaps there's a better solution? Maybe stream-specific polling thresholds, or something.
          Hide
          criccomini Chris Riccomini added a comment -

          Perhaps a better strategy for determining when we poll systems is to just make it time-based. We could define something like maxPollLatencyMs, which would define the maximum amount of time we'd go between system polls. The SystemConsumers would then either poll when there's no more data for any stream, or when the max poll latency has expired. If the max poll latency has expired, only streams with no data will be polled.

          Show
          criccomini Chris Riccomini added a comment - Perhaps a better strategy for determining when we poll systems is to just make it time-based. We could define something like maxPollLatencyMs, which would define the maximum amount of time we'd go between system polls. The SystemConsumers would then either poll when there's no more data for any stream, or when the max poll latency has expired. If the max poll latency has expired, only streams with no data will be polled.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching an updated patch. RB is in same spot.

          1. Switched from refreshThreshold to pollIntervalMs. See SAMZA-342 for details.
          2. Tweaked some logic in update to ignore multiple deserialization failures rather than giving up after one try.
          3. Skip calling consumers(systemName) in poll() until we actually need it.
          Show
          criccomini Chris Riccomini added a comment - Attaching an updated patch. RB is in same spot. Switched from refreshThreshold to pollIntervalMs. See SAMZA-342 for details. Tweaked some logic in update to ignore multiple deserialization failures rather than giving up after one try. Skip calling consumers(systemName) in poll() until we actually need it.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching initial performance test results.

          I ran:

          $ ./gradlew :samza-test:test -Dsamza.test.single=TestSamzaContainerPerformance
          

          On my MacBook Air:

            Model Name:	MacBook Air
            Model Identifier:	MacBookAir5,2
            Processor Name:	Intel Core i7
            Processor Speed:	2 GHz
            Number of Processors:	1
            Total Number of Cores:	2
            L2 Cache (per Core):	256 KB
            L3 Cache:	4 MB
            Memory:	8 GB
          

          With SAMZA-245-3 patch applied: after the initial warm-up (~5s), the test processes 9830000 messages in 13s (18s-5s). This comes out to ~756,000 messages/sec (9860000/12).

          With master: there doesn't appear to be much warm-up. The test processes 10000000 messages in 37s. This comes out to ~270,000 messages/sec (10000000/37). If you discount the first 5 seconds to make it fair, then the test processes ~273,000 messages/sec (8740000/32).

          Given the 2.8x difference in performance, I'm a bit concerned that I've broken something. All of the existing tests pass, but I'm going to write some SystemConsumers unit tests to validate things.

          Show
          criccomini Chris Riccomini added a comment - Attaching initial performance test results. I ran: $ ./gradlew :samza-test:test -Dsamza.test.single=TestSamzaContainerPerformance On my MacBook Air: Model Name: MacBook Air Model Identifier: MacBookAir5,2 Processor Name: Intel Core i7 Processor Speed: 2 GHz Number of Processors: 1 Total Number of Cores: 2 L2 Cache (per Core): 256 KB L3 Cache: 4 MB Memory: 8 GB With SAMZA-245 -3 patch applied: after the initial warm-up (~5s), the test processes 9830000 messages in 13s (18s-5s). This comes out to ~756,000 messages/sec (9860000/12). With master: there doesn't appear to be much warm-up. The test processes 10000000 messages in 37s. This comes out to ~270,000 messages/sec (10000000/37). If you discount the first 5 seconds to make it fair, then the test processes ~273,000 messages/sec (8740000/32). Given the 2.8x difference in performance, I'm a bit concerned that I've broken something. All of the existing tests pass, but I'm going to write some SystemConsumers unit tests to validate things.
          Hide
          closeuris Yan Fang added a comment -

          Seems I have problem in running the same test. Downloaded the master and ran the same command line in my mac as you did, but got some errors actually.

          samza-test/build/test-results/TEST-org.apache.samza.test.performance.TestSamzaContainerPerformance.xml has errors. It says

          141421 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 10000 messages in 0 seconds.
          141493 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 20000 messages in 0 seconds.
          141530 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 30000 messages in 0 seconds.
          141554 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 40000 messages in 0 seconds.
          141581 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 50000 messages in 0 seconds.
          141604 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 60000 messages in 0 seconds.
          141627 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 70000 messages in 0 seconds.
          141649 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 80000 messages in 0 seconds.
          141678 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 90000 messages in 0 seconds.
          141745 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 100000 messages in 0 seconds.
          

          Also tested in my VM, which has similar errors, except the "0 second" become "1 second"...

          Did I miss something?

          Show
          closeuris Yan Fang added a comment - Seems I have problem in running the same test. Downloaded the master and ran the same command line in my mac as you did, but got some errors actually. samza-test/build/test-results/TEST-org.apache.samza.test.performance.TestSamzaContainerPerformance.xml has errors. It says 141421 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 10000 messages in 0 seconds. 141493 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 20000 messages in 0 seconds. 141530 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 30000 messages in 0 seconds. 141554 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 40000 messages in 0 seconds. 141581 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 50000 messages in 0 seconds. 141604 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 60000 messages in 0 seconds. 141627 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 70000 messages in 0 seconds. 141649 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 80000 messages in 0 seconds. 141678 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 90000 messages in 0 seconds. 141745 [ThreadJob] INFO org.apache.samza.test.performance.TestPerformanceTask - Processed 100000 messages in 0 seconds. Also tested in my VM, which has similar errors, except the "0 second" become "1 second"... Did I miss something?
          Hide
          criccomini Chris Riccomini added a comment -

          Sorry, I should have mentioned that I patched master to make the test fair. The reason you're seeing only 100,000 messages is because the master branch's TestPerformanceTask.scala and TestSamzaContainerPerformance.scala files were modified to increase the duration of the test. I copy/pasted those two changes into master in order to make the test the same for both branches.

          Show
          criccomini Chris Riccomini added a comment - Sorry, I should have mentioned that I patched master to make the test fair. The reason you're seeing only 100,000 messages is because the master branch's TestPerformanceTask.scala and TestSamzaContainerPerformance.scala files were modified to increase the duration of the test. I copy/pasted those two changes into master in order to make the test the same for both branches.
          Hide
          criccomini Chris Riccomini added a comment -

          Jakob Homan, you reviewed the wrong RB. Please see:

          https://reviews.apache.org/r/23588/

          Show
          criccomini Chris Riccomini added a comment - Jakob Homan , you reviewed the wrong RB. Please see: https://reviews.apache.org/r/23588/
          Hide
          criccomini Chris Riccomini added a comment -

          Closed the old RB.

          Show
          criccomini Chris Riccomini added a comment - Closed the old RB.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching latest patch with unit tests.

          I can't seem to prove to myself that the code is broken. All tests pass. Unfortunately, since we don't have JaCoCo anymore, I don't know test coverage for the SystemConsumers class, but I think it's pretty close to 100%.

          Show
          criccomini Chris Riccomini added a comment - Attaching latest patch with unit tests. I can't seem to prove to myself that the code is broken. All tests pass. Unfortunately, since we don't have JaCoCo anymore, I don't know test coverage for the SystemConsumers class, but I think it's pretty close to 100%.
          Hide
          criccomini Chris Riccomini added a comment -

          Can I get a review from someone on this?

          Show
          criccomini Chris Riccomini added a comment - Can I get a review from someone on this?
          Hide
          tgiuli TJ Giuli added a comment -

          Hey, Chris Riccomini, definitely +1 on making polling latency configurable. For my own applications of Samza, I find myself mixing real-time streams with batch streams in the same stream processor (SAMZA-342), so I'd ideally be able to set a max polling interval on a per-stream basis.

          Show
          tgiuli TJ Giuli added a comment - Hey, Chris Riccomini , definitely +1 on making polling latency configurable. For my own applications of Samza, I find myself mixing real-time streams with batch streams in the same stream processor ( SAMZA-342 ), so I'd ideally be able to set a max polling interval on a per-stream basis.
          Hide
          criccomini Chris Riccomini added a comment -

          I'd ideally be able to set a max polling interval on a per-stream basis.

          The way things work with the current patch is that the poll interval is set for the job, but the containers only poll for "empty" stream partitions (i.e. those for which there are no more messages available to process). If you have one batch stream partition and one real-time stream partition, then the poll interval will cause the real-time stream partition to be polled every 50ms (default in the patch), while the batch stream partition will only be polled when it becomes empty again.

          Show
          criccomini Chris Riccomini added a comment - I'd ideally be able to set a max polling interval on a per-stream basis. The way things work with the current patch is that the poll interval is set for the job, but the containers only poll for "empty" stream partitions (i.e. those for which there are no more messages available to process). If you have one batch stream partition and one real-time stream partition, then the poll interval will cause the real-time stream partition to be polled every 50ms (default in the patch), while the batch stream partition will only be polled when it becomes empty again.
          Hide
          criccomini Chris Riccomini added a comment -

          My latest patch has an unprocessedMessages map, which it uses as a buffer. When the SystemConsumers class needs more messages for a set of SystemStreamPartitions, it asks the underlying SystemConsumer for all messages for these SystemStreamPartitions. The messages are then buffered in the unprocessedMessages map in SystemConsumers, and doled out to the MessageChooser over time.

          An alternative implementation would be shove all of the buffering logic into the SystemConsumer implementations. In this case, you'd have the SystemConsumer.poll API look like this:

          List<IncomingMessageEnvelope> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException;
          

          Rather than having the poll method return all available messages for the supplied SystemStreamPartitions, it would only return at most one IncomingMessageEnvelope per SystemStreamPartition.

          This tweak simplifies the SystemConsumers implementation slightly (LoC goes from 315 to 280) because we no longer need an unprocessedMessage buffer in it. We simply call SystemConsumer.poll() immediately whenever the MessageChooser needs a new message, rather than first taking from the unprocessedMessages buffer. It also allows us to eliminate the "update" method.

          The trade off for this slightly simpler SystemConsumers implementation is that it pushes the buffering complexity into individual SystemConsumer implementations. This is a double edged sword. On the one hand, it allows us to potentially write different buffering strategies for different systems. I can't think of any use cases for this off the top of my head, but there might be some. On the other hand, it means that each individual system has its own way of configuring its buffer (and buffer refresh rates), which seems like it would be confusing to end-users. In addition, it also seems a bit scary, since poll() might be high overhead for some system implementations. If we poll aggressively every time we need a message, this could lead to bad performance. To get around this, again, the underlying system would have to buffer messages to return avoid the cost of polling frequently.

          Right now, the trade off doesn't matter much, since all known implementations use BlockingEnvelopeMap, which already contains the buffering logic. When we convert to the 0.9 new Kafka consumer, this won't be the case anymore. In that case, we'll have to re-implement the unprocessedMessages logic inside the Kafka consumer, since the consumer will fetch for N messages, but only be able to return one at a time via the poll() method.

          After implementing a bit of this alternative approach, I don't think the tradeoffs are worth it, so I'm sticking with the existing patch. I didn't get as far as doing any performance tests with the tweaked patch because I'm not comfortable with the tradeoffs.

          Show
          criccomini Chris Riccomini added a comment - My latest patch has an unprocessedMessages map, which it uses as a buffer. When the SystemConsumers class needs more messages for a set of SystemStreamPartitions, it asks the underlying SystemConsumer for all messages for these SystemStreamPartitions. The messages are then buffered in the unprocessedMessages map in SystemConsumers, and doled out to the MessageChooser over time. An alternative implementation would be shove all of the buffering logic into the SystemConsumer implementations. In this case, you'd have the SystemConsumer.poll API look like this: List<IncomingMessageEnvelope> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException; Rather than having the poll method return all available messages for the supplied SystemStreamPartitions, it would only return at most one IncomingMessageEnvelope per SystemStreamPartition. This tweak simplifies the SystemConsumers implementation slightly (LoC goes from 315 to 280) because we no longer need an unprocessedMessage buffer in it. We simply call SystemConsumer.poll() immediately whenever the MessageChooser needs a new message, rather than first taking from the unprocessedMessages buffer. It also allows us to eliminate the "update" method. The trade off for this slightly simpler SystemConsumers implementation is that it pushes the buffering complexity into individual SystemConsumer implementations. This is a double edged sword. On the one hand, it allows us to potentially write different buffering strategies for different systems. I can't think of any use cases for this off the top of my head, but there might be some. On the other hand, it means that each individual system has its own way of configuring its buffer (and buffer refresh rates), which seems like it would be confusing to end-users. In addition, it also seems a bit scary, since poll() might be high overhead for some system implementations. If we poll aggressively every time we need a message, this could lead to bad performance. To get around this, again, the underlying system would have to buffer messages to return avoid the cost of polling frequently. Right now, the trade off doesn't matter much, since all known implementations use BlockingEnvelopeMap, which already contains the buffering logic. When we convert to the 0.9 new Kafka consumer , this won't be the case anymore. In that case, we'll have to re-implement the unprocessedMessages logic inside the Kafka consumer, since the consumer will fetch for N messages, but only be able to return one at a time via the poll() method. After implementing a bit of this alternative approach, I don't think the tradeoffs are worth it, so I'm sticking with the existing patch. I didn't get as far as doing any performance tests with the tweaked patch because I'm not comfortable with the tradeoffs.
          Hide
          martinkl Martin Kleppmann added a comment -

          Your approach seems reasonable to me. I've added a few comments on the details on RB.

          Show
          martinkl Martin Kleppmann added a comment - Your approach seems reasonable to me. I've added a few comments on the details on RB.
          Hide
          criccomini Chris Riccomini added a comment -

          I ran the TestSamzaContainerPerformance test for the alternative approach that I outlined above, and it's actually slower (~555,000 msgs/sec) than my current approach (~756,000 msgs/sec).

          Show
          criccomini Chris Riccomini added a comment - I ran the TestSamzaContainerPerformance test for the alternative approach that I outlined above, and it's actually slower (~555,000 msgs/sec) than my current approach (~756,000 msgs/sec).
          Hide
          closeuris Yan Fang added a comment -

          Tested in my machine

            Model Name:	MacBook Pro
            Model Identifier:	MacBookPro11,3
            Processor Name:	Intel Core i7
            Processor Speed:	2.3 GHz
            Number of Processors:	1
            Total Number of Cores:	4
            L2 Cache (per Core):	256 KB
            L3 Cache:	6 MB
            Memory:	16 GB
          

          The patch takes 9 seconds for 10,000,000 messages while master takes 25 seconds. The result is similar to what Chris' from his machine. The performance is improved 2~3x. I did another test by increasing the message number to 100,000,000, the patch takes 114 seconds while the master takes 364 seconds. From the second test, the performance is improved more than 3x.

          Also, the same concern as Martin's: will it be out-of-memory if processing is slower than polling interval?

          Show
          closeuris Yan Fang added a comment - Tested in my machine Model Name: MacBook Pro Model Identifier: MacBookPro11,3 Processor Name: Intel Core i7 Processor Speed: 2.3 GHz Number of Processors: 1 Total Number of Cores: 4 L2 Cache (per Core): 256 KB L3 Cache: 6 MB Memory: 16 GB The patch takes 9 seconds for 10,000,000 messages while master takes 25 seconds. The result is similar to what Chris' from his machine. The performance is improved 2~3x. I did another test by increasing the message number to 100,000,000, the patch takes 114 seconds while the master takes 364 seconds. From the second test, the performance is improved more than 3x. Also, the same concern as Martin's: will it be out-of-memory if processing is slower than polling interval?
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching latest patch to address Martin and Yan's comments. All changes made as requested.

          Show
          criccomini Chris Riccomini added a comment - Attaching latest patch to address Martin and Yan's comments. All changes made as requested.
          Hide
          criccomini Chris Riccomini added a comment -

          Also, the same concern as Martin's: will it be out-of-memory if processing is slower than polling interval?

          I don't think this should happen. We only add SystemStreamPartitions to the emptySystemStreamPartitionsBySystem variable when the unprocessMessages buffer is empty for the SSP (i.e. the update method returns false for the SSP):

                if (!update(systemStreamPartition)) {
                  emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
                }
          

          Then, we move the SSP from emptySystemStreamPartitionsBySystem whenever we get at least one message for it:

                  if (numEnvelopes > 0) {
                    unprocessedMessages.put(systemStreamPartition, envelopes)
          
                    // Update the chooser if it needs a message for this SSP.
                    if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) {
                      update(systemStreamPartition)
                    }
                  }
          

          In this way, we should only poll for SSPs that are totally empty. Thus the theoretical max memory used is the max amount of messages/SSP * number of SSPs * 2. The max amount of messages/SSP is defined by the underlying consumer. In Kafka's case, it's defined by the fetch size. The number of SSPs is simply a function of task.inputs' input streams. The 2 is required because we have two buffers: BlockingEnvelopeMap's queue, and SystemConsumers' unprocessedMessages buffer.

          If a StreamTask is processing slower than the messages are coming in, SystemConsumers should not poll for any new messages since all incoming SystemStreamPartitions have messages in unprocessedMessages. If one or more SystemStreamPartitions have empty buffers in unprocessedMessages, then ONLY those SSPs should be refreshed when the pollIntervalMs expires.

          Show
          criccomini Chris Riccomini added a comment - Also, the same concern as Martin's: will it be out-of-memory if processing is slower than polling interval? I don't think this should happen. We only add SystemStreamPartitions to the emptySystemStreamPartitionsBySystem variable when the unprocessMessages buffer is empty for the SSP (i.e. the update method returns false for the SSP): if (!update(systemStreamPartition)) { emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition) } Then, we move the SSP from emptySystemStreamPartitionsBySystem whenever we get at least one message for it: if (numEnvelopes > 0) { unprocessedMessages.put(systemStreamPartition, envelopes) // Update the chooser if it needs a message for this SSP. if (emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).remove(systemStreamPartition)) { update(systemStreamPartition) } } In this way, we should only poll for SSPs that are totally empty. Thus the theoretical max memory used is the max amount of messages/SSP * number of SSPs * 2. The max amount of messages/SSP is defined by the underlying consumer. In Kafka's case, it's defined by the fetch size. The number of SSPs is simply a function of task.inputs' input streams. The 2 is required because we have two buffers: BlockingEnvelopeMap's queue, and SystemConsumers' unprocessedMessages buffer. If a StreamTask is processing slower than the messages are coming in, SystemConsumers should not poll for any new messages since all incoming SystemStreamPartitions have messages in unprocessedMessages. If one or more SystemStreamPartitions have empty buffers in unprocessedMessages, then ONLY those SSPs should be refreshed when the pollIntervalMs expires.
          Hide
          criccomini Chris Riccomini added a comment -

          So, after the latest patch, I re-ran the perf test, and it's slow (~40s for 10000000 messages). This is much worse than my prior patches. I'm wondering if the list.remove call is much slower than the queue.poll call. Investigating.

          Show
          criccomini Chris Riccomini added a comment - So, after the latest patch, I re-ran the perf test, and it's slow (~40s for 10000000 messages). This is much worse than my prior patches. I'm wondering if the list.remove call is much slower than the queue.poll call. Investigating.
          Hide
          criccomini Chris Riccomini added a comment -

          Looks like switching from ArrayList to LinkedList in BlockingEnvelopeMap speeds things up a bit (~18s for 10000000 messages).

          Show
          criccomini Chris Riccomini added a comment - Looks like switching from ArrayList to LinkedList in BlockingEnvelopeMap speeds things up a bit (~18s for 10000000 messages).
          Hide
          criccomini Chris Riccomini added a comment -

          Switching back to the Queue interface, and it takes 12s again. I'm inclined to leave the interface as a Queue instead of a List, since this is exactly how we're using it. Thoughts?

          Show
          criccomini Chris Riccomini added a comment - Switching back to the Queue interface, and it takes 12s again. I'm inclined to leave the interface as a Queue instead of a List, since this is exactly how we're using it. Thoughts?
          Hide
          criccomini Chris Riccomini added a comment -

          Dug into this more. The fastest approach seems to be to leave List as the API, use LinkedList in BlockingEnvelopeMap, and use new ArrayDequeue(polledList) in SystemConsumers.

          No changes: ~40s
          LinkedList: ~18s
          LinkedList+ArrayDeque: ~12s

          At 12s, we're doing 833,333 messages/sec. I'm also no longer seeing that "warm up" that I mentioned earlier.

          Show
          criccomini Chris Riccomini added a comment - Dug into this more. The fastest approach seems to be to leave List as the API, use LinkedList in BlockingEnvelopeMap, and use new ArrayDequeue(polledList) in SystemConsumers. No changes: ~40s LinkedList: ~18s LinkedList+ArrayDeque: ~12s At 12s, we're doing 833,333 messages/sec. I'm also no longer seeing that "warm up" that I mentioned earlier.
          Hide
          criccomini Chris Riccomini added a comment -

          Attaching latest patch. Left SystemConsumer.poll API using List, but changed SystemConsumers to convert the list to an ArrayDeque when it is received. BlockingEnvelopeMap is using LinkedList now.

          Show
          criccomini Chris Riccomini added a comment - Attaching latest patch. Left SystemConsumer.poll API using List, but changed SystemConsumers to convert the list to an ArrayDeque when it is received. BlockingEnvelopeMap is using LinkedList now.
          Hide
          criccomini Chris Riccomini added a comment -

          Switching BlockingEnvelopeMap back to ArrayList that's properly initialized to the size of the queue that is going to drain into it.

          Show
          criccomini Chris Riccomini added a comment - Switching BlockingEnvelopeMap back to ArrayList that's properly initialized to the size of the queue that is going to drain into it.
          Hide
          criccomini Chris Riccomini added a comment -

          Latest patch is running at ~12s for 10,000,000 messages. Uses ArrayList in BlockingEnvelopeMap, and ArrayDeque in SystemConsumers.

          Oddly, switching the SystemConsumer.poll API to use Queue and using ArrayDeque in BlockingEnvelope map is actually slower (~19s).

          Show
          criccomini Chris Riccomini added a comment - Latest patch is running at ~12s for 10,000,000 messages. Uses ArrayList in BlockingEnvelopeMap, and ArrayDeque in SystemConsumers. Oddly, switching the SystemConsumer.poll API to use Queue and using ArrayDeque in BlockingEnvelope map is actually slower (~19s).
          Hide
          criccomini Chris Riccomini added a comment -

          K, ready for next batch of reviews now.

          Show
          criccomini Chris Riccomini added a comment - K, ready for next batch of reviews now.
          Hide
          criccomini Chris Riccomini added a comment -

          Also, did some CPU sampling to see how CPU cycles are spent after this patch. About 40% of CPU is spent inside SystemStreamPartition.hashCode inside OffsetManager.update. I think we can optimize this in a separate patch, but it's good to know that there's still room to grow.

          By commenting out the OffsetManager.update line, I was able to do 10,000,000 messages in 9 seconds on my little Macbook Air, which puts us over 1 million messages/sec. So, this looks like the next thing to target.

          Show
          criccomini Chris Riccomini added a comment - Also, did some CPU sampling to see how CPU cycles are spent after this patch. About 40% of CPU is spent inside SystemStreamPartition.hashCode inside OffsetManager.update. I think we can optimize this in a separate patch, but it's good to know that there's still room to grow. By commenting out the OffsetManager.update line, I was able to do 10,000,000 messages in 9 seconds on my little Macbook Air, which puts us over 1 million messages/sec. So, this looks like the next thing to target.
          Hide
          martinkl Martin Kleppmann added a comment -

          Great work Chris. +1

          Show
          martinkl Martin Kleppmann added a comment - Great work Chris. +1
          Hide
          criccomini Chris Riccomini added a comment -

          Merged and committed to master

          Show
          criccomini Chris Riccomini added a comment - Merged and committed to master
          Hide
          criccomini Chris Riccomini added a comment -

          Something has been bugging me. Why is it that using an ArrayList in BlockingEnvelopeMap, and converting it to an ArrayDeque in SystemConsumers is faster than just using an ArrayDeque in BlockingEnvelopeMap, and using it directly in SystemConsumers?

          I wrote a little test to see how LinkedList, ArrayList, and ArrayDeque perform on the methods that BlockingEnvelopeMap and SystemConsumers call:

          public static LinkedBlockingQueue<Integer> loadBlockingQueue(int operations) {
              Integer integer = new Integer(0);
              LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<Integer>();
              for (int operation = 0; operation < operations; ++operation) {
                linkedBlockingQueue.add(integer);
              }
              return linkedBlockingQueue;
            }
          
            public static void main(String[] args) throws InterruptedException {
              int OPERATIONS = 10000000;
              int LOOPS = 3;
              Integer integer = new Integer(0);
              LinkedBlockingQueue<Integer> linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
          
              for (int loop = 0; loop < LOOPS; ++loop) {
                System.out.println("--------- ITERATION " + loop + " ---------");
          
                // ----
                long start = System.currentTimeMillis();
                List<Integer> list = new LinkedList<Integer>();
                System.out.println("list(): " + (System.currentTimeMillis() - start) + "ms");
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  list.add(integer);
                }
          
                System.out.println("list.add: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  list.size();
                }
          
                System.out.println("list.size: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  list.remove(0);
                }
          
                System.out.println("list.remove: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                linkedBlockingQueue.drainTo(list);
          
                System.out.println("list.drainTo: " + (System.currentTimeMillis() - start) + "ms");
          
                linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
          
                // ----
                start = System.currentTimeMillis();
                ArrayList<Integer> arrayList = new ArrayList<Integer>(OPERATIONS);
                System.out.println("arrayList(): " + (System.currentTimeMillis() - start) + "ms");
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  arrayList.add(integer);
                }
          
                System.out.println("arrayList.add: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  arrayList.size();
                }
          
                System.out.println("arrayList.size: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                // for (int operation = 0; operation < OPERATIONS; ++operation) {
                // arrayList.remove(0);
                // }
          
                System.out.println("arrayList.remove: DOES NOT FINISH");
          
                // ----
                arrayList = new ArrayList<Integer>(OPERATIONS);
                start = System.currentTimeMillis();
          
                linkedBlockingQueue.drainTo(arrayList);
          
                System.out.println("arrayList.drainTo: " + (System.currentTimeMillis() - start) + "ms");
          
                linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
          
                // ----
                start = System.currentTimeMillis();
          
                new ArrayDeque<Integer>(arrayList);
          
                System.out.println("new q(arrayList): " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
                Queue<Integer> q = new ArrayDeque<Integer>(OPERATIONS);
                System.out.println("q(): " + (System.currentTimeMillis() - start) + "ms");
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  q.add(integer);
                }
          
                System.out.println("q.add: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  q.size();
                }
          
                System.out.println("q.size: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  q.remove();
                }
          
                System.out.println("q.remove: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  q.offer(integer);
                }
          
                System.out.println("q.offer: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                for (int operation = 0; operation < OPERATIONS; ++operation) {
                  q.poll();
                }
          
                System.out.println("q.poll: " + (System.currentTimeMillis() - start) + "ms");
          
                // ----
                start = System.currentTimeMillis();
          
                linkedBlockingQueue.drainTo(q);
          
                System.out.println("q.drainTo: " + (System.currentTimeMillis() - start) + "ms");
          
                linkedBlockingQueue = loadBlockingQueue(OPERATIONS);
              }
            }
          

          Running this code shows:

          --------- ITERATION 0 ---------
          list(): 0ms
          list.add: 1797ms
          list.size: 4ms
          list.remove: 82ms
          list.drainTo: 1915ms
          arrayList(): 2422ms
          arrayList.add: 37ms
          arrayList.size: 5ms
          arrayList.remove: DOES NOT FINISH
          arrayList.drainTo: 95ms
          new q(arrayList): 75ms
          q(): 79ms
          q.add: 38ms
          q.size: 5ms
          q.remove: 22ms
          q.offer: 48ms
          q.poll: 20ms
          q.drainTo: 113ms
          --------- ITERATION 1 ---------
          list(): 0ms
          list.add: 1387ms
          list.size: 0ms
          list.remove: 61ms
          list.drainTo: 1761ms
          arrayList(): 539ms
          arrayList.add: 46ms
          arrayList.size: 0ms
          arrayList.remove: DOES NOT FINISH
          arrayList.drainTo: 100ms
          new q(arrayList): 98ms
          q(): 63ms
          q.add: 46ms
          q.size: 0ms
          q.remove: 28ms
          q.offer: 48ms
          q.poll: 28ms
          q.drainTo: 102ms
          --------- ITERATION 2 ---------
          list(): 0ms
          list.add: 1360ms
          list.size: 0ms
          list.remove: 65ms
          list.drainTo: 1778ms
          arrayList(): 533ms
          arrayList.add: 47ms
          arrayList.size: 0ms
          arrayList.remove: DOES NOT FINISH
          arrayList.drainTo: 123ms
          new q(arrayList): 93ms
          q(): 78ms
          q.add: 44ms
          q.size: 0ms
          q.remove: 29ms
          q.offer: 51ms
          q.poll: 31ms
          q.drainTo: 122ms
          

          As you can see, the ArrayDeque is faster or on-par with everything else on almost every method. These results are backed up by others as well.

          Yet, switching SystemConsumer.poll to return a Queue, and using ArrayDeque in BlockingEnvelopeMap exhibits two odd behaviors:

          1. There is a "startup time" where the first few hundred thousand messages are processed rather slowly (0-5s).
          2. The total messages/sec is the same, or slightly slower than the ArrayList (~5%).

          I added a bunch of counters to BlockingEnvelopeMap so we can understand what's happening inside BlockingEnvelopeMap in this performance test:

          public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
              // ...
          
              Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                  System.err.println("add: " + addCalls + ", drain: " + drainToCalls + ", timeouts: " + timeoutCalls + ", blocking calls: " + timeRemainingCalls + ", block on outstanding calls: " + blockOnOutstandingCalls + ", polls: " + pollCalls + ", average queue size: " + (totalQueueItems / (float) pollsPerSsp) + ", ssp-queue-polls: " + sspQueuePolls + ", got messages: " + gotMessages);
                }
              });
            }
          
            // ...
          
            int addCalls = 0;
            int drainToCalls = 0;
            int totalQueueItems = 0;
            int pollCalls = 0;
            int timeoutCalls = 0;
            int timeRemainingCalls = 0;
            int blockOnOutstandingCalls = 0;
            int sspQueuePolls = 0;
            int gotMessages = 0;
          
            public Map<SystemStreamPartition, Queue<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
              long stopTime = clock.currentTimeMillis() + timeout;
              Map<SystemStreamPartition, Queue<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>>();
          
              metrics.incPoll();
              ++pollCalls;
          
              for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
                BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
                Queue<IncomingMessageEnvelope> outgoingList = new ArrayDeque<IncomingMessageEnvelope>(queue.size());
                totalQueueItems += queue.size();
                ++sspQueuePolls;
          
                if (queue.size() > 0) {
                  queue.drainTo(outgoingList);
                  ++drainToCalls;
                } else if (timeout != 0) {
                  IncomingMessageEnvelope envelope = null;
                  ++timeoutCalls;
          
                  // How long we can legally block (if timeout > 0)
                  long timeRemaining = stopTime - clock.currentTimeMillis();
          
                  if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
                    ++blockOnOutstandingCalls;
                    // Block until we get at least one message, or until we catch up to
                    // the head of the stream.
                    while (envelope == null && !isAtHead(systemStreamPartition)) {
                      metrics.incBlockingPoll(systemStreamPartition);
                      envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
                    }
                  } else if (timeout > 0 && timeRemaining > 0) {
                    ++timeRemainingCalls;
                    // Block until we get at least one message.
                    metrics.incBlockingTimeoutPoll(systemStreamPartition);
                    envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
                  }
          
                  // If we got a message, add it.
                  if (envelope != null) {
                    outgoingList.add(envelope);
                    ++addCalls;
                    // Drain any remaining messages without blocking.
                    queue.drainTo(outgoingList);
                    ++drainToCalls;
                  }
                }
          
                if (outgoingList.size() > 0) {
                  messagesToReturn.put(systemStreamPartition, outgoingList);
                }
              }
          
              if (messagesToReturn.size() > 0) {
                ++gotMessages;
              }
          
              return messagesToReturn;
            }
          

          Running with the Queue/ArrayDeque implementation shows:

          add: 0, drain: 4034, timeouts: 2895, blocking calls: 0, block on outstanding calls: 0, polls: 43, average queue size: 1708.5803, ssp-queue-polls: 11683, got messages: 43
          

          Some interesting stats to note:

          1. Every time SystemConsumers polled BlockingEnvelopeMap, there were messages available to process.
          2. SystemConsumers only polled BlockingEnvelopeMap 43 times over an 18s execution window (43/18=2.38 times/sec). The rest of the time, there were no empty SSPs in SystemConsumers, the entire BlockingEnvelopeMap.poll logic was skipped.
          3. No blocking polls were made.
          4. Add was never called (because a blocking poll was never made).

          Based on these metrics, only drainTo and size are called on the data structure, both of which ArrayDeque seems to perform better at in my little test. Yet the ArrayList in BlockingEnvelopeMap leads to faster performance in TestSamzaContainerPerformance.

          I also adjusted the test runtime, number of messages the MockSystemConsumer was emitting, log verbosity, and number of MockSystemConsumer threads. In all cases, the results were the same: ArrayList was slightly faster.

          I must be missing something, but I don't know what. I think the better thing to spend time on now is to investigate speeding up the OffsetManager.update call in TaskInstance.process, which takes over half of the CPU time in the main thread. I'll back off the ArrayList/ArrayDeque investigation for now.

          Show
          criccomini Chris Riccomini added a comment - Something has been bugging me. Why is it that using an ArrayList in BlockingEnvelopeMap, and converting it to an ArrayDeque in SystemConsumers is faster than just using an ArrayDeque in BlockingEnvelopeMap, and using it directly in SystemConsumers? I wrote a little test to see how LinkedList, ArrayList, and ArrayDeque perform on the methods that BlockingEnvelopeMap and SystemConsumers call: public static LinkedBlockingQueue< Integer > loadBlockingQueue( int operations) { Integer integer = new Integer (0); LinkedBlockingQueue< Integer > linkedBlockingQueue = new LinkedBlockingQueue< Integer >(); for ( int operation = 0; operation < operations; ++operation) { linkedBlockingQueue.add(integer); } return linkedBlockingQueue; } public static void main( String [] args) throws InterruptedException { int OPERATIONS = 10000000; int LOOPS = 3; Integer integer = new Integer (0); LinkedBlockingQueue< Integer > linkedBlockingQueue = loadBlockingQueue(OPERATIONS); for ( int loop = 0; loop < LOOPS; ++loop) { System .out.println( "--------- ITERATION " + loop + " ---------" ); // ---- long start = System .currentTimeMillis(); List< Integer > list = new LinkedList< Integer >(); System .out.println( "list(): " + ( System .currentTimeMillis() - start) + "ms" ); start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { list.add(integer); } System .out.println( "list.add: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { list.size(); } System .out.println( "list.size: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { list.remove(0); } System .out.println( "list.remove: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); linkedBlockingQueue.drainTo(list); System .out.println( "list.drainTo: " + ( System .currentTimeMillis() - start) + "ms" ); linkedBlockingQueue = loadBlockingQueue(OPERATIONS); // ---- start = System .currentTimeMillis(); ArrayList< Integer > arrayList = new ArrayList< Integer >(OPERATIONS); System .out.println( "arrayList(): " + ( System .currentTimeMillis() - start) + "ms" ); start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { arrayList.add(integer); } System .out.println( "arrayList.add: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { arrayList.size(); } System .out.println( "arrayList.size: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); // for ( int operation = 0; operation < OPERATIONS; ++operation) { // arrayList.remove(0); // } System .out.println( "arrayList.remove: DOES NOT FINISH" ); // ---- arrayList = new ArrayList< Integer >(OPERATIONS); start = System .currentTimeMillis(); linkedBlockingQueue.drainTo(arrayList); System .out.println( "arrayList.drainTo: " + ( System .currentTimeMillis() - start) + "ms" ); linkedBlockingQueue = loadBlockingQueue(OPERATIONS); // ---- start = System .currentTimeMillis(); new ArrayDeque< Integer >(arrayList); System .out.println( " new q(arrayList): " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); Queue< Integer > q = new ArrayDeque< Integer >(OPERATIONS); System .out.println( "q(): " + ( System .currentTimeMillis() - start) + "ms" ); start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { q.add(integer); } System .out.println( "q.add: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { q.size(); } System .out.println( "q.size: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { q.remove(); } System .out.println( "q.remove: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { q.offer(integer); } System .out.println( "q.offer: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); for ( int operation = 0; operation < OPERATIONS; ++operation) { q.poll(); } System .out.println( "q.poll: " + ( System .currentTimeMillis() - start) + "ms" ); // ---- start = System .currentTimeMillis(); linkedBlockingQueue.drainTo(q); System .out.println( "q.drainTo: " + ( System .currentTimeMillis() - start) + "ms" ); linkedBlockingQueue = loadBlockingQueue(OPERATIONS); } } Running this code shows: --------- ITERATION 0 --------- list(): 0ms list.add: 1797ms list.size: 4ms list.remove: 82ms list.drainTo: 1915ms arrayList(): 2422ms arrayList.add: 37ms arrayList.size: 5ms arrayList.remove: DOES NOT FINISH arrayList.drainTo: 95ms new q(arrayList): 75ms q(): 79ms q.add: 38ms q.size: 5ms q.remove: 22ms q.offer: 48ms q.poll: 20ms q.drainTo: 113ms --------- ITERATION 1 --------- list(): 0ms list.add: 1387ms list.size: 0ms list.remove: 61ms list.drainTo: 1761ms arrayList(): 539ms arrayList.add: 46ms arrayList.size: 0ms arrayList.remove: DOES NOT FINISH arrayList.drainTo: 100ms new q(arrayList): 98ms q(): 63ms q.add: 46ms q.size: 0ms q.remove: 28ms q.offer: 48ms q.poll: 28ms q.drainTo: 102ms --------- ITERATION 2 --------- list(): 0ms list.add: 1360ms list.size: 0ms list.remove: 65ms list.drainTo: 1778ms arrayList(): 533ms arrayList.add: 47ms arrayList.size: 0ms arrayList.remove: DOES NOT FINISH arrayList.drainTo: 123ms new q(arrayList): 93ms q(): 78ms q.add: 44ms q.size: 0ms q.remove: 29ms q.offer: 51ms q.poll: 31ms q.drainTo: 122ms As you can see, the ArrayDeque is faster or on-par with everything else on almost every method. These results are backed up by others as well. Yet, switching SystemConsumer.poll to return a Queue, and using ArrayDeque in BlockingEnvelopeMap exhibits two odd behaviors: There is a "startup time" where the first few hundred thousand messages are processed rather slowly (0-5s). The total messages/sec is the same, or slightly slower than the ArrayList (~5%). I added a bunch of counters to BlockingEnvelopeMap so we can understand what's happening inside BlockingEnvelopeMap in this performance test: public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) { // ... Runtime .getRuntime().addShutdownHook( new Thread () { public void run() { System .err.println( "add: " + addCalls + ", drain: " + drainToCalls + ", timeouts: " + timeoutCalls + ", blocking calls: " + timeRemainingCalls + ", block on outstanding calls: " + blockOnOutstandingCalls + ", polls: " + pollCalls + ", average queue size: " + (totalQueueItems / ( float ) pollsPerSsp) + ", ssp-queue-polls: " + sspQueuePolls + ", got messages: " + gotMessages); } }); } // ... int addCalls = 0; int drainToCalls = 0; int totalQueueItems = 0; int pollCalls = 0; int timeoutCalls = 0; int timeRemainingCalls = 0; int blockOnOutstandingCalls = 0; int sspQueuePolls = 0; int gotMessages = 0; public Map<SystemStreamPartition, Queue<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { long stopTime = clock.currentTimeMillis() + timeout; Map<SystemStreamPartition, Queue<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>>(); metrics.incPoll(); ++pollCalls; for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) { BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition); Queue<IncomingMessageEnvelope> outgoingList = new ArrayDeque<IncomingMessageEnvelope>(queue.size()); totalQueueItems += queue.size(); ++sspQueuePolls; if (queue.size() > 0) { queue.drainTo(outgoingList); ++drainToCalls; } else if (timeout != 0) { IncomingMessageEnvelope envelope = null ; ++timeoutCalls; // How long we can legally block ( if timeout > 0) long timeRemaining = stopTime - clock.currentTimeMillis(); if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) { ++blockOnOutstandingCalls; // Block until we get at least one message, or until we catch up to // the head of the stream. while (envelope == null && !isAtHead(systemStreamPartition)) { metrics.incBlockingPoll(systemStreamPartition); envelope = queue.poll(1000, TimeUnit.MILLISECONDS); } } else if (timeout > 0 && timeRemaining > 0) { ++timeRemainingCalls; // Block until we get at least one message. metrics.incBlockingTimeoutPoll(systemStreamPartition); envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS); } // If we got a message, add it. if (envelope != null ) { outgoingList.add(envelope); ++addCalls; // Drain any remaining messages without blocking. queue.drainTo(outgoingList); ++drainToCalls; } } if (outgoingList.size() > 0) { messagesToReturn.put(systemStreamPartition, outgoingList); } } if (messagesToReturn.size() > 0) { ++gotMessages; } return messagesToReturn; } Running with the Queue/ArrayDeque implementation shows: add: 0, drain: 4034, timeouts: 2895, blocking calls: 0, block on outstanding calls: 0, polls: 43, average queue size: 1708.5803, ssp-queue-polls: 11683, got messages: 43 Some interesting stats to note: Every time SystemConsumers polled BlockingEnvelopeMap, there were messages available to process. SystemConsumers only polled BlockingEnvelopeMap 43 times over an 18s execution window (43/18=2.38 times/sec). The rest of the time, there were no empty SSPs in SystemConsumers, the entire BlockingEnvelopeMap.poll logic was skipped. No blocking polls were made. Add was never called (because a blocking poll was never made). Based on these metrics, only drainTo and size are called on the data structure, both of which ArrayDeque seems to perform better at in my little test. Yet the ArrayList in BlockingEnvelopeMap leads to faster performance in TestSamzaContainerPerformance. I also adjusted the test runtime, number of messages the MockSystemConsumer was emitting, log verbosity, and number of MockSystemConsumer threads. In all cases, the results were the same: ArrayList was slightly faster. I must be missing something, but I don't know what. I think the better thing to spend time on now is to investigate speeding up the OffsetManager.update call in TaskInstance.process, which takes over half of the CPU time in the main thread. I'll back off the ArrayList/ArrayDeque investigation for now.

            People

            • Assignee:
              criccomini Chris Riccomini
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development