Consider a situation where a sink is assigned 2 topic partitions. One of them runs into a transient issue and no more data from it can be processed. However, the other topic partition is proceeding fine. We don't want to block the second partition by constantly throwing exceptions due to data from the first topic partition.
The new consumer now supports pause/resume, so we should expose these to the task. We could expose the functionality directly, although that would also make the task responsible for scheduling some task in the future to check whether it can resume. Another approach might be to make the API include the backoff time. Then the framework would automatically resume consumption of the topic partition after that time, which would presumably prompt the task to reevaluate the situation for the topic partition when it receives another message for it.