Today, when a WorkerSinkTask uses context.requestCommit(), the next call to iteration will cause the commit to happen. As part of the commit execution it will also change the nextCommit milliseconds.
This creates some weird behaviors when a SinkTask calls context.requestCommit multiple times. In our case, we were calling requestCommit when the number of kafka records we processed exceed a threshold. This resulted in the nextCommit being several days in the future and caused it to only commit when the record threshold was reached.
We expected the task to commit when the record threshold was reached OR when the timer went off.