I have a job that runs with ~235 TaskInstances per-container. The behavior that I'm seeing is that the SamzaContainer is spending about 20% of its CPU time on a trace() logging call inside TaskInstance.send. The code for this method is:
This method is invoked from the RunLoop.send method:
So, I believe the problem here is that every send() invocation in the RunLoop ends up running 235 send() calls on my 235 TaskInstances.
Since the RunLoop doesn't know which TaskInstances actually have messages to send, it has to call send() on all of them. I took a look at my metrics, and the vast vast vast majority of the time, the TaskInstance.send method is just skipping the send call (metrics.sendsSkipped.inc), so this is totally wasted time.
The easiest solution here is to remove the trace call when there are no outgoing messages in TaskInstance.send.
Another solution would be to modify the RunLoop/TaskInstance in such a way that the RunLoop would know which TaskInstances it needs to call send() on, and only call send() on those TaskInstances. Presumably, this would have to be done with a callback or something.
I took a look at the RunLoop, and of the four tight-loop methods (process, send, window, and commit), the only one that iterates over all TaskInstances on every invocation is send(). The rest are either time-bounded (e.g. once ever 60 seconds), or only call methods on a single TaskInstance (process). My inclination is to just remove this log line in TaskInstance.send then, rather than refactoring the code.