Uploaded image for project: 'Bahir (Retired)'
  1. Bahir (Retired)
  2. BAHIR-283

InfluxDBWriter fails to write the final element in each element

    XMLWordPrintableJSON

Details

    Description

      {{ /**

      • This method calls the InfluxDB write API whenever the element list reaches the {@link * #bufferSize}

        . It keeps track of the latest timestamp of each element. It compares the latest

      • timestamp with the context.timestamp() and takes the bigger (latest) timestamp.
        *
      • @param in incoming data
      • @param context current Flink context
      • @see org.apache.flink.api.connector.sink.SinkWriter.Context
        */
        @Override
        public void write(final IN in, final Context context) throws IOException {
        if (this.elements.size() == this.bufferSize) { LOG.debug("Buffer size reached preparing to write the elements."); this.writeCurrentElements(); this.elements.clear(); }

        else {
        LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size());
        this.elements.add(this.schemaSerializer.serialize(in, context));
        if (context.timestamp() != null)

        { this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp()); }

        }
        }}}

      The bug is in this write method. If the number of elements in the buffer is less than the configured buffer size, the current element is added to the buffer. If the number of elements in the buffer is equal to the buffer size, the buffer is flushed and the current element is not added to the next buffer. This results in the current element being dropped.

      Attachments

        Issue Links

          Activity

            People

              dquig David Quigley
              dquig David Quigley
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 2h
                  2h
                  Remaining:
                  Remaining Estimate - 2h
                  2h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified