Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.13.1
Description
Using the sink-buffer-flush-max-rows and sink-buffer-flush-interval options for a kafka sink produces a lot of duplicate key/values in the target kafka topic. Maybe the BufferedUpsertSinkFunction should clone the buffered key/value RowData objects, but it doesn’t. Seems like in line 134 the condition should be negated or the ternary operator results swapped:
this.valueCopier =
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()
? Function.identity()
: typeSerializer::copy;
(in the jdbc sink the same logic is done but the ternary operator results swapped)