Description
During performance testing of a relatively simple multi-threaded job we noted that as the Kafka partitions started to exhaust the individual partition tasks that normally processed several messages per ms started to slow down significantly and head towards a throughput of only one message each every 10 ms. Investigation indicated that the AsyncRunLoop was often blocking for 10 ms looking for new work to choose from the empty partitions and in the process starving the tasks working on partitions with more work to do.
We found this in a private branch based on 0.12.0 and I have reproduced the issue in master. I coded up a potential fix which works for us in 0.12.0. I have re-based the fix on master and tested it there too.
Here is a detailed description of what I think is going on assuming there is only one partition with messages to process:
- AsyncRunLoop (main) thread Loop #1
- Chooser selects a message for that partition and places it in the pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
- Runs the the AsyncTaskWorker associated with that SSP.
- Fetches the first message from the pendingEnvelopeQueue.
- Submits request to the AsyncStreamTaskAdapter to process the first message in a worker thread
- Updates the Chooser with a new message from that SSP (if one is available).
- Worker thread
- Starts processing the message
- AsyncRunLoop (main) thread Loop #2
- Chooser selects a second message for that partition and places it in the pendingEnvelopeQueue for the AsyncTaskState object associated with that SSP
- Runs the AsyncTaskWorker associated with that SSP:
- The Worker thread is still busy processing the previous message so the AsyncTaskWorker does nothing.
- Message is not "fetched" and therefore the Chooser is NOT updated.
- AsyncRunLoop (main) thread Loop #3
- Chooser can not find a message to process.
- Start a poll with a timeout of 10 ms on all the partitions with no messages (this does not include the one partition with messages).
- Worker thread
- Completes processing first message
- It updates AsyncTaskWorker state
- Here is where the throughput suffers
- AsyncTaskWorker is ready for more work, but, the main thread that hands out the work is stuck "polling"
- AsyncRunLoop (main) thread Loop #3 continues
- Wakes up after 10 ms
- Runs the the AsyncTaskWorker associated with that SSP.
- The AsyncTaskWorker is now ready to more work.
- Fetches the second message from the pendingEnvelopeQueue.
- Submits request to the AsyncStreamTaskAdapter to process the second message in a (new) worker thread
- Updates the Chooser with a new message from that SSP (if one is available).
The key changes in my fix are:
- In the AsyncRunLoop (multi-threaded) case don't "poll" when the Chooser returns no messages. Just "drain" any messages that have already arrived.
- Instead in the main loop "wait" when the Chooser returns no messages using the existing synchronization lock that is already used to wait when all the AsyncTaskWorkers are busy.
- There is one boolean added to deal with a race condition which could otherwise trigger "waits" we don't need and therefore impact throughput. It may cause us to occasionally skip a "wait" that would be actually be OK but I think the performance penalty of that is pretty small (one extra spin of the main loop when there is no work to handle).
The change I made is pretty small, but, the code it is changing is fairly complex. I would be very interested in:
- Getting your insights on the problem I have described. Maybe there is a setting that could work around this behavior. I could not find one.
- Feedback on my proposed solution. Especially potential dangers.
I have attached a patch file to issue. I can open a merge request if that would be a better way to get your input.
Here is a chart that summarizes some testing I did with a small test job. The details are: