Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1384

Race condition with async commit affects checkpoint correctness



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.13.1
    • None
    • None


      tl;dr if any in-flight request updates the offsets between producer.flush() and offsetmanager.checkpoint() we could write a checkpoint for a message that did not yet go out over the wire and could subsequently fail.

      Consider two threads A and B. A is performing an async commit. B is an in-flight process(). The following sequence will cause data loss:
      A: TaskInstance.commit() begins
      A: producer.flush() is called // no new messages will go out in this batch

      B: producer.send() is called
      B: TaskCallback is invoked for the finished process()
      B: OffsetManager records the offset for the completed process()

      A: producer.flush() finishes
      A: checkpoint is written using the latest offsets from the OffsetManager. This INCLUDES the offset for the latest send, which has not yet gone out over the wire.
      A: TaskInstance.commit() finishes

      B: producer.send()->callback is invoked with an error. Send was unsuccessful, but has been checkpointed already.
      B: Exception is propagated and container fails
      Result: Container is restarted and starts from the last checkpoint.

      Note that this is only an issue when the commit() occurs concurrently with in-flight requests, so it doesn't affect the fully-synchronous mode or concurrent mode with synchronous commit().

      Proposed solution:
      Take a snapshot of the offsets in the OffsetManager at the beginning of commit(). Only checkpoint those offsets and nothing new that has been sent since the commit() started.


        Issue Links



              jmakes Jake Maes
              jmakes Jake Maes
              0 Vote for this issue
              2 Start watching this issue