Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14597

[Streams] record-e2e-latency-max is not reporting correct metrics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • metrics, streams
    • None

    Description

      I was following this KIP documentation (https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams) and kafka streams documentation (https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end) . Based on these documentations , the record-e2e-latency-max should monitor the full end to end latencies, which includes both consumption latencies and  processing delays.

      However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using process-latency-max .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time.

       

      Code describing my topology:

         static Topology buildTopology(String inputTopic, String outputTopic) {
              log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic);
      
              Serde<String> stringSerde = Serdes.String();
              StreamsBuilder builder = new StreamsBuilder();
              builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
                      .peek((k,v) -> log.info("Observed event: key" + k + " value: " + v))
                      .mapValues(s -> {
                          try {
                              System.out.println("sleeping for 3 seconds");
                              Thread.sleep(3000);
                          }
                          catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          return  s.toUpperCase();
                      })
                      .peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v))
                      .to(outputTopic, Produced.with(stringSerde, stringSerde));
              return builder.build();
          } 

       

      Attachments

        1. image-2023-03-21-15-07-24-352.png
          6 kB
          Atul Jain
        2. image-2023-03-21-19-01-54-713.png
          18 kB
          Atul Jain
        3. image-2023-03-21-19-03-07-525.png
          20 kB
          Atul Jain
        4. image-2023-03-21-19-03-28-625.png
          21 kB
          Atul Jain
        5. process-latency-max.jpg
          365 kB
          Atul Jain
        6. record-e2e-latency-max.jpg
          258 kB
          Atul Jain

        Activity

          People

            talestonini Tales Tonini
            atuljainiitk Atul Jain
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: