WorkerState.transferLocalBatch uses an int lastOverflowCount to track the size of the overflow queue, and periodically resend the backpressure status to remote workers if the queue continues to grow.
The current implementation has two problems:
- The single variable tracks the receive queue of every executor in the worker, meaning it will be overwritten as tuples are sent to different executors.
- The variable is locally scoped, and so is not carried over between mini-batches.
This only comes in to effect when the overflow queue grows beyond 10000, which shouldn't happen unless a backpressure signal isn't received by an upstream worker, but if it does happen then a backpressure signal is going to be sent for every mini-batch processed. I do not know if this is the intended behaviour, but the way the code is written seems to indicate that it isn't.
I have thought of two redesigns to fix these problems and make the behaviour align with how one would interpret the code:
- Change the lastOverflowCount variable to a map of taskId to overflow count - This will retain the behaviour of resending the backpressure update every mini-batch once over the threshold, if that behaviour is intended. However, it will increase garbage by creating a new map every time WorkerState.transferLocalBatch is called by the NettyWorker thread.
- Change the lastOverflowCount variable to a map of taskId to overflow count and move it to the BackPressureTracker class - This will retain the counter between batches, and so only resend backpressure status every 10000 received tuples per task.
My preference is for the second option, as if the intended behaviour is to resend every mini batch it should be rewritten so the intent is explicit from the code.
It is also possible that doing it the second way could run in to concurrency issues i didn't think of, but as far as i can tell the topology.worker.receiver.thread.count config option isn't used at all? If that's the case and there is only one NettyWorker thread per worker then it should be fine.
I have implemented both methods and attempted to benchmark them with https://github.com/yahoo/storm-perf-test but as i am running all workers on one machine i couldn't get it to the point that the relevant code was ever called.