When we fixed
KAFKA-12226, we made offset commits for source tasks take place without blocking for any in-flight records to be acknowledged. While a task is running, this change should yield significant benefits in some cases and allow us to continue to commit offsets even when a topic partition on the broker is unavailable or the producer is unable to send records to Kafka as quickly as they are produced by the task.
However, this becomes problematic when a task is scheduled for shutdown with in-flight records. During shutdown, the latest committable offsets are calculated, and then flushed to the offset backing store (in distributed mode, this is the offsets topic). During that flush, the task's producer may continue to send records to Kafka, but their offsets will not be committed, which causes these records to be redelivered if/when the task is restarted.
Essentially, duplicate records are now possible even in healthy source tasks.