Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14659

source-record-write-[rate|total] metrics include filtered records

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.5.0, 3.4.1, 3.3.3
    • connect
    • None

    Description

      Source tasks in Kafka connect offer two sets of metrics (documented in ConnectMetricsRegistry.java):

      Metric Description
      source-record-poll-rate The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker.
      source-record-write-rate The average per-second number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker. This is after transformations are applied and excludes any records filtered out by the transformations.

      There are also corresponding "-total" metrics that capture the total number of records polled and written for the metrics above, respectively.

      In short, the "poll" metrics capture the number of messages sourced pre-transformation/filtering, and the "write" metrics should capture the number of messages ultimately written to Kafka post-transformation/filtering. However, the implementation of the source-record-write-*  metrics includes records filtered out by transformations (and also records that result in produce failures with the config errors.tolerance=all).

      Details

      In AbstractWorkerSourceTask.java, each source record is passed through the transformation chain where it is potentially filtered out, checked to see if it was in fact filtered out, and if so it is accounted for in the internal metrics via counter.skipRecord().

      for (final SourceRecord preTransformRecord : toSend) {         
          retryWithToleranceOperator.sourceRecord(preTransformRecord);
          final SourceRecord record = transformationChain.apply(preTransformRecord);            
          final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
          if (producerRecord == null || retryWithToleranceOperator.failed()) {                
              counter.skipRecord();
              recordDropped(preTransformRecord);
              continue;
          }
          ...
      

      SourceRecordWriteCounter.skipRecord() is implemented as follows:

          ....
          public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup metricsGroup) {
              assert batchSize > 0;
              assert metricsGroup != null;
              this.batchSize = batchSize;
              counter = batchSize;
              this.metricsGroup = metricsGroup;
          }
          public void skipRecord() {
              if (counter > 0 && --counter == 0) {
                  finishedAllWrites();
              }
          }
          ....
          private void finishedAllWrites() {
              if (!completed) {
                  metricsGroup.recordWrite(batchSize - counter);
                  completed = true;
              }
          }
      

      For example: If a batch starts with 100 records, batchSize and counter will both be initialized to 100. If all 100 records get filtered out, counter will be decremented 100 times, and finishedAllWrites()will record the value 100 to the underlying source-record-write-*  metrics rather than 0, the correct value according to the documentation for these metrics.

      Solutions

      Assuming the documentation correctly captures the intent of the source-record-write-*  metrics, it seems reasonable to fix these metrics such that filtered records do not get counted.

      It may also be useful to add additional metrics to capture the rate and total number of records filtered out by transformations, which would require a KIP.

      I'm not sure what the best way of accounting for produce failures in the case of errors.tolerance=all is yet. Maybe these failures deserve their own new metrics?

      Attachments

        Issue Links

          Activity

            People

              hgeraldino Hector Geraldino
              cbeard Chris Beard
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: