Description
In Connect's WorkerSinkTask, we do sequence number validation to ensure that offset commits are handled in the right order (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).
Unfortunately, for asynchronous commits, the lastCommittedOffsets field is overridden regardless of this sequence check as long as the response had no error (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284):
OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { if (error == null) { lastCommittedOffsets = offsets; } onCommitCompleted(error, seqno); } };
Hence if we get an out of order commit, then the internal state will be inconsistent. To fix this, we should only override lastCommittedOffsets after sequence validation as part of the onCommitCompleted(...) method.