Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-635

KafkaSystemProducer may get exceptions out-of-order

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.9.0
    • None
    • kafka
    • None

    Description

      In the current KafkaSystemProducer design, there is a possibility that a non-retriable exceptions can be thrown from the Kafka producer send thread and creates the race conditions in the following code blocks:

      82    sendFailed.set(false)
      83
      84    retryBackoff.run(
      85      loop => {
      86        if (sendFailed.get()) {
      87          throw exceptionThrown.get()
      88        }
      

      And

      91            def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
      92              if (exception == null) {
      93                //send was successful. Don't retry
      94                metrics.sendSuccess.inc
      95              } else {
      96                //If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries
      97                //Hence, fail container!
      98                exceptionThrown.compareAndSet(null, exception)
      99                sendFailed.set(true)
      100              }
      101            }
      

      The main thread sets and gets sendFailed in line 82 and 86, and the Kafka send thread is setting it in line 99.

      Thera could be two race conditions here:
      1) the Kafka send thread complete line 99 and the main thread executes line 82, in which we missed an exception
      2) the main thread finishes line 82 in the current message, and the Kafka send thread execute line 99 for the previous message. In this case, the main thread got an exception that is for the previous message, not the current one.

      The configuration that can trigger this to happen is:

      systems.kafka.producer.max.request.size=102400
      

      Broker side:

      message.max.bytes=10240
      

      And inside task.process(), we send a 16KB message first, then a small message.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            nickpan47 Yi Pan

            Dates

              Created:
              Updated:

              Slack

                Issue deployment