Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1599

AsyncRunLoop Slow When Some Partitions Are Empty

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.12.0, 0.14.0
    • 0.14.1
    • container
    • None

    Description

      During performance testing of a relatively simple multi-threaded job we noted that as the Kafka partitions started to exhaust the individual partition tasks that normally processed several messages per ms started to slow down significantly and head towards a throughput of only one message each every 10 ms. Investigation indicated that the AsyncRunLoop was often blocking for 10 ms looking for new work to choose from the empty partitions and in the process starving the tasks working on partitions with more work to do.

      We found this in a private branch based on 0.12.0 and I have reproduced the issue in master. I coded up a potential fix which works for us in 0.12.0.  I have re-based the fix on master and tested it there too.

      Here is a detailed description of what I think is going on assuming there is only one partition with messages to process:

      • AsyncRunLoop (main) thread Loop #1
        • Chooser selects a message for that partition and places it in the pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
        • Runs the the AsyncTaskWorker associated with that SSP.
        • Fetches the first message from the pendingEnvelopeQueue.
        • Submits request to the AsyncStreamTaskAdapter to process the first message in a worker thread
        • Updates the Chooser with a new message from that SSP (if one is available).
      • Worker thread
        • Starts processing the message
      • AsyncRunLoop (main) thread Loop #2
        • Chooser selects a second message for that partition and places it in the pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
        • Runs the AsyncTaskWorker associated with that SSP:
        • The Worker thread is still busy processing the previous message so the AsyncTaskWorker does nothing.
        • Message is not "fetched" and therefore the Chooser is NOT updated.
      • AsyncRunLoop (main) thread Loop #3
        • Chooser can not find a message to process.
        • Start a poll with a timeout of 10 ms on all the partitions with no messages (this does not include the one partition with messages).
      • Worker thread
        • Completes processing first message
        • It updates AsyncTaskWorker state
      • Here is where the throughput suffers
        • AsyncTaskWorker is ready for more work, but, the main thread that hands out the work is stuck "polling"
      • AsyncRunLoop (main) thread Loop #3 continues
        • Wakes up after 10 ms
        • Runs the the AsyncTaskWorker associated with that SSP.
        • The AsyncTaskWorker is now ready to more work.
        • Fetches the second message from the pendingEnvelopeQueue.
        • Submits request to the AsyncStreamTaskAdapter to process the second message in a (new) worker thread
        • Updates the Chooser with a new message from that SSP (if one is available).

      The key changes in my fix are:

      • In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser returns no messages.  Just "drain" any messages that have already arrived.
      • Instead in the main loop "wait" when the Chooser returns no messages using the existing synchronization lock that is already used to wait when all the AsyncTaskWorkers are busy.
      • There is one boolean added to deal with a race condition which could otherwise trigger "waits" we don't need and therefore impact throughput.  It may cause us to occasionally skip a "wait" that would be actually be OK but I think the performance penalty of that is pretty small (one extra spin of the main loop when there is no work to handle).

      The change I made is pretty small, but, the code it is changing is fairly complex.  I would be very interested in:

      • Getting your insights on the problem I have described. Maybe there is a setting that could work around this behavior.  I could not find one.
      • Feedback on my proposed solution. Especially potential dangers.

      I have attached a patch file to issue. I can open a merge request if that would be a better way to get your input.

      Attachments

        1. slow-async-run-loop.patch
          17 kB
          James Lent

        Activity

          jwlent55 James Lent added a comment -

           Here is a chart that summarizes some testing I did with a small test job.  The details are:

          • 6 Partitions in Kafka Topic.
          • Only one partition had messages and it had 100.
          • Each run restarted at the beginning of the topic.
          • The time reported is the time between when the first and the last messages were processed.
          • The process method just counts events and logs the number processed (i.e. does basically nothing).
          • There were variations in the times for each scenario tested (I ran it on my dev box).  I would therefore not read anything into the small variations between "new" and "old".
          • Old refers to the 0.14.1-SNAPSHOT code (as it existed yesterday: 1dfc5ce)
          • New refers to that same code with my patch applied.

           

                          Old New
                Got Thread Got Single Process Run Loop   Process Process
          pool.size thread.mode   Pool Size Thread Mode Thread Mode   Time (ms) Time (ms)
          not specified not specified   0 false ThreadJob asynchronous   14 14
          not specified false   0 false ThreadJob asynchronous   16 15
          not specified true   0 true ThreadJob single thread   12 12
          1 not specified   1 false Container Thread asynchronous   1022 16
          1 false   1 false Container Thread asynchronous   1023 16
          1 true   1 true ThreadJob single thread   13 12
          6 not specified   6 false Container Thread asynchronous   1015 21
          6 false   6 false Container Thread asynchronous   1005 24
          6 true   6 true ThreadJob single thread   11 12
          jwlent55 James Lent added a comment -  Here is a chart that summarizes some testing I did with a small test job.  The details are: 6 Partitions in Kafka Topic. Only one partition had messages and it had 100. Each run restarted at the beginning of the topic. The time reported is the time between when the first and the last messages were processed. The process method just counts events and logs the number processed (i.e. does basically nothing). There were variations in the times for each scenario tested (I ran it on my dev box).  I would therefore not read anything into the small variations between "new" and "old". Old refers to the 0.14.1-SNAPSHOT code (as it existed yesterday: 1dfc5ce) New refers to that same code with my patch applied.                   Old New       Got Thread Got Single Process Run Loop   Process Process pool.size thread.mode   Pool Size Thread Mode Thread Mode   Time (ms) Time (ms) not specified not specified   0 false ThreadJob asynchronous   14 14 not specified false   0 false ThreadJob asynchronous   16 15 not specified true   0 true ThreadJob single thread   12 12 1 not specified   1 false Container Thread asynchronous   1022 16 1 false   1 false Container Thread asynchronous   1023 16 1 true   1 true ThreadJob single thread   13 12 6 not specified   6 false Container Thread asynchronous   1015 21 6 false   6 false Container Thread asynchronous   1005 24 6 true   6 true ThreadJob single thread   11 12
          nickpan47 Yi Pan added a comment -

          Hi, jwlent55, thank you so much for the detailed analysis and the patch! Appreciate it! Can you open a PR on github as well? Thanks!

          nickpan47 Yi Pan added a comment - Hi, jwlent55 , thank you so much for the detailed analysis and the patch! Appreciate it! Can you open a PR on github as well? Thanks!
          jwlent55 James Lent added a comment -

          Thanks for your interest in looking at this issue.  I have opened a Pull request (#436).  It has passed the checks.  Look forward to comments/suggestions/gotchas.

          jwlent55 James Lent added a comment - Thanks for your interest in looking at this issue.  I have opened a Pull request (#436).  It has passed the checks.  Look forward to comments/suggestions/gotchas.
          jagadish1989@gmail.com Jagadish added a comment -

          Heya jwlent55, this is extremely thorough. Impressive work! Will review this PR soon.

          jagadish1989@gmail.com Jagadish added a comment - Heya jwlent55 , this is extremely thorough. Impressive work! Will review this PR soon.
          nickpan47 Yi Pan added a comment -

          PR #436 is merged and submitted.

          nickpan47 Yi Pan added a comment - PR #436 is merged and submitted.

          People

            jwlent55 James Lent
            jwlent55 James Lent
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: