Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-113 Improve log performance
  3. SAMZA-384

TaskInstance.send is slow with high task count

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • container
    • None

    Description

      I have a job that runs with ~235 TaskInstances per-container. The behavior that I'm seeing is that the SamzaContainer is spending about 20% of its CPU time on a trace() logging call inside TaskInstance.send. The code for this method is:

        def send {
          if (collector.envelopes.size > 0) {
            trace("Sending messages for taskName: %s, %s" format (taskName, collector.envelopes.size))
      
            metrics.sends.inc
            metrics.messagesSent.inc(collector.envelopes.size)
      
            collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope))
      
            trace("Resetting collector for taskName: %s" format taskName)
      
            collector.reset
          } else {
            trace("Skipping send for taskName %s because no messages were collected." format taskName)
      
            metrics.sendsSkipped.inc
          }
        }
      

      This method is invoked from the RunLoop.send method:

        private def send {
          updateTimer(metrics.sendMs) {
            trace("Triggering send in task instances.")
            metrics.sends.inc
            taskInstances.values.foreach(_.send)
          }
        }
      

      So, I believe the problem here is that every send() invocation in the RunLoop ends up running 235 send() calls on my 235 TaskInstances.

      Since the RunLoop doesn't know which TaskInstances actually have messages to send, it has to call send() on all of them. I took a look at my metrics, and the vast vast vast majority of the time, the TaskInstance.send method is just skipping the send call (metrics.sendsSkipped.inc), so this is totally wasted time.

      The easiest solution here is to remove the trace call when there are no outgoing messages in TaskInstance.send.

      Another solution would be to modify the RunLoop/TaskInstance in such a way that the RunLoop would know which TaskInstances it needs to call send() on, and only call send() on those TaskInstances. Presumably, this would have to be done with a callback or something.

      I took a look at the RunLoop, and of the four tight-loop methods (process, send, window, and commit), the only one that iterates over all TaskInstances on every invocation is send(). The rest are either time-bounded (e.g. once ever 60 seconds), or only call methods on a single TaskInstance (process). My inclination is to just remove this log line in TaskInstance.send then, rather than refactoring the code.

      Attachments

        1. SAMZA-384-1.patch
          23 kB
          Chris Riccomini
        2. SAMZA-384-0.patch
          0.6 kB
          Chris Riccomini
        3. SAMZA-384-0.patch
          24 kB
          Chris Riccomini

        Issue Links

          Activity

            People

              criccomini Chris Riccomini
              criccomini Chris Riccomini
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: