Description
Since the initial SinkTask code was originally written with an early version of the new consumer, it wasn't setup to handle rebalances properly. Since we recently added the rebalance listener, we can use it to correctly commit offsets. However, the existing code also has two issues. First, in the case of a failure to flush data in the sink task, we are not correctly rewinding to the last committed offsets. We need to do this since we cannot be sure what happened to the outstanding data, so we need to reprocess it.
Second, flushing when stopping was not being handled propertly. The existing code was assuming that as part of SinkTask.stop() we would. However, this did not make sense since SinkTask.stop() was being invoked before the worker thread was stopped, so we could end up committing the wrong offsets. Instead, we need to wait for the worker thread to finish whatever it is currently doing, do one final flush + commit offsets, and only then invoke stop() to allow the task to do final cleanup. This is a bit confusing because stop means different things for source and sink tasks since they have pull vs push semantics.