The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods exposed in Kafka Connect's SinkTask interface seems a little unclear and too closely tied to consumer semantics. From the javadoc, these APIs are used to open/close per-partition resources, but that would suggest that we should always get one call to onPartitionsAssigned() before writing any records for the corresponding partitions and one call to onPartitionsRevoked() when we have finished with them. However, the same methods on the consumer are used to indicate phases of the rebalance operation: onPartitionsRevoked() is called before the rebalance begins and onPartitionsAssigned() is called after it completes. In particular, the consumer does not guarantee a final call to onPartitionsRevoked().
This mismatch makes the contract of these methods unclear. In fact, the WorkerSinkTask currently does not guarantee the initial call to onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead, the task implementation must pull the initial assignment from the SinkTaskContext. To make it more confusing, the call to commit offsets following onPartitionsRevoked() causes a flush() on a partition which had already been revoked. All of this makes it difficult to use this API as suggested in the javadocs.
To fix this, we should clarify the behavior of these methods and consider renaming them to avoid confusion with the same methods in the consumer API. If onPartitionsAssigned() is meant for opening resources, maybe we can rename it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We can then fix the code to ensure that a typical open/close contract is enforced. This would also mean removing the need to pass the initial assignment in the SinkTaskContext. This would give the following API:
We could also consider going a little further. Instead of depending on onPartitionsAssigned() to open resources, tasks could open partition resources on demand as records are received. In general, connectors will need some way to close partition-specific resources, but there might not be any need to pass the full list of partitions to close since the only open resources should be those that have received writes since the last rebalance. In this case, we just have a single method:
The downside to this is that the difference between close() and stop() then becomes a little unclear.
Obviously these are not compatible changes and connectors would have to be updated.