      In WorkerSourceTask, finishSuccessfulFlush() is not synchronized. In one case (if the flush didn't even have to be started), this is ok because we are already in a synchronized block. However, the other case is outside the synchronized block.

      The result of this was transient failures of the system test for clean bouncing copycat nodes. The bug doesn't cause exceptions because finishSuccessfulFlush() only does a swap of two maps and sets a flag to false. However, because of the swapping of the two maps that maintain outstanding messages, we could by chance also be starting to send a message. If the message accidentally gets added to the backlog queue, then the flushing flag is toggled, we can "lose" that message temporarily into the backlog queue. Then we'll get a callback that will log an error because it can't find a record of the acked message (which, if it ever appears, should be considered a critical issue since it shouldn't be possible), and then on the next commit, it'll be swapped back into place. On the subsequent commit, the flush will never be able to complete because the message will be in the outstanding list, but will already have been acked. This, in turn, makes it impossible to commit offsets, and results in duplicate messages even under clean bounces where we should be able to get exactly once delivery assuming no network delays or other issues.

      As a result of seeing this error, it became apparent that handling of WorkerSourceTaskThreads that do not complete quickly enough was not working properly. The ShutdownableThread should get interrupted if it does not complete quickly enough, but logs like this would happen:

      [2015-11-18 01:02:13,897] INFO Stopping task verifiable-source-0 (org.apache.kafka.connect.runtime.Worker)
      [2015-11-18 01:02:13,897] INFO Starting graceful shutdown of thread WorkerSourceTask-verifiable-source-0 (org.apache.kafka.connect.util.ShutdownableThread)
      [2015-11-18 01:02:13,897] DEBUG WorkerSourceTask

      Unknown macro: {id=verifiable-source-0}

      Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2015-11-18 01:02:17,901] DEBUG Submitting 1 entries to backing store (org.apache.kafka.connect.storage.OffsetStorageWriter)
      [2015-11-18 01:02:18,897] INFO Forcing shutdown of thread WorkerSourceTask-verifiable-source-0 (org.apache.kafka.connect.util.ShutdownableThread)
      [2015-11-18 01:02:18,897] ERROR Graceful stop of task WorkerSourceTask

      failed. (org.apache.kafka.connect.runtime.Worker)
      [2015-11-18 01:02:18,897] ERROR Failed to flush WorkerSourceTask

      Unknown macro: {id=verifiable-source-0}

      , timed out while waiting for producer to flush outstanding messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2015-11-18 01:02:18,898] DEBUG Submitting 1 entries to backing store (org.apache.kafka.connect.storage.OffsetStorageWriter)
      [2015-11-18 01:02:18,898] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

      Actions in the background thread performing the commit continue to occur after it is supposedly interrupted. This is because InterruptedExceptions during the flush were being ignored (some time ago they were not even possible). Instead, any interruption by the main thread trying to shut down the thread in preparation for a rebalance should be handled by failing the commit operation and returning so the thread can exit cleanly.




