Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5980

FailOnInvalidTimestamp does not log error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.10.2.1, 0.11.0.1
    • 1.0.0
    • streams
    • None

    Description

      In this block of code from the StreamThread class, an error caused by an invalid timestamp is re-thrown and the comment says that it should already be logged, however this particular error is NOT logged anywhere and only shows up in standard error.

          /**
           * Execute the stream processors
           *
           * @throws KafkaException for any Kafka-related exceptions
           * @throws Exception      for any other non-Kafka exceptions
           */
          @Override
          public void run() {
              log.info("{} Starting", logPrefix);
      
              try {
                  runLoop();
                  cleanRun = true;
              } catch (KafkaException e) {
                  // just re-throw the exception as it should be logged already
                  throw e;
              } catch (Exception e) {
                  // we have caught all Kafka related exceptions, and other runtime exceptions
                  // should be due to user application errors
                  log.error("{} Streams application error during processing: ", logPrefix, e);
                  throw e;
              } finally {
                  shutdown();
              }
          }
      

      Message from Standard Error.

      Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(...) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
          at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)
          at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:60)
          at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46)
          at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:86)
          at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
          at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
          at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
          at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
      

      Attachments

        Issue Links

          Activity

            People

              mjsax Matthias J. Sax
              msrylander Michael Rylander
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: