KAFKA-5567 describes how offset commits for sink connectors are broken if a record's topic-partition is mutated by an SMT, e.g RegexRouter or TimestampRouter.
This is also a problem for sink connectors that manage their own offsets, i.e. those that store offsets elsewhere and call SinkTaskContext.rewind(). In this case, the transformation has already been applied by the time the SinkTask sees it, so there is no way it could correctly track offsets and call rewind() with valid values. For example, this would make the offset tracking that Confluent's HDFS connector does by working with filenames no longer work. Even if they were stored separately in a file rather than relying on filenames, it still wouldn't have ever had the correct offsets to write to that file.
There are a couple of options:
1. Decide that this is an acceptable consequence of combining SMTs with sink connectors and it's a limitation we accept. You can either transform the data via Kafka Streams instead or accept that you can't do these "routing" type operations in the sink connector unless it supports it natively. This might not be the wrong choice since we think there are very few connectors that track their own offsets. In the case of HDFS, we might rarely hit this issue because it supports its own file/directory partitioning schemes anyway so doing this via SMTs isn't as necessary there.
2. Try to expose the original record information to the sink connector via the records. I can think of 2 ways this could be done. The first is to attach the original record to each SinkRecord. The cost here is relatively high in terms of memory, especially for sink connectors that need to buffer data. The second is to add fields to SinkRecords for originalTopic() and originalPartition(). This feels a bit ugly to me but might be the least intrusive change API-wise and we can guarantee those fields aren't overwritten by not allowing public constructors to set them.
3. Try to expose the original record information to the sink connector via a new pre-processing callback. The idea is similar to preCommit, but instead would happen before any processing occurs. Taken to its logical conclusion this turns into a sort of interceptor interface (preConversion, preTransformation, put, and preCommit).
4. Add something to the Context that allows the connector to get back at the original information. Maybe some sort of IdentityMap<Record, Record> originalPutRecords() that would let you get a mapping back to the original records. One nice aspect of this is that the connector can hold onto the original only if it needs it.
5. A very intrusive change/extension to the SinkTask API that passes in pairs of <original, transformed> records. Accomplishes the same as 2 but requires what I think are more complicated changes. Mentioned for completeness.
6. Something else I haven't thought of?