Description
The consumer record that is used during punctuate is static, this can cause java.util.ConcurrentModificationException when modifying the headers.
Using a single instance of ConsumerRecord for all punctuates causes other strange behavior:
- Headers are shared across partitions.
- A topology that adds a single header could append an infinite number of headers (one per punctuate iteration), causing memory problems in the current topology as well as down stream consumers since the headers are written with the record when it is produced to a topic.
I would expect that each invocation of punctuate would be initialized with a new header object.
Attachments
Issue Links
- links to