Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23025

sink-buffer-max-rows and sink-buffer-flush-interval options produce a lot of duplicates

    XMLWordPrintableJSON

Details

    Description

      Using the sink-buffer-flush-max-rows and sink-buffer-flush-interval options for a kafka sink produces a lot of duplicate key/values in the target kafka topic. Maybe the BufferedUpsertSinkFunction should clone the buffered key/value RowData objects, but it doesn’t. Seems like in line 134 the condition should be negated or the ternary operator results swapped:

      this.valueCopier =
       getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()
       ? Function.identity()
       : typeSerializer::copy;

      (in the jdbc sink the same logic is done but the ternary operator results swapped)

       

      Attachments

        Activity

          People

            fsk119 Shengkai Fang
            joemoe Joe Moser
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: