Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9310

StreamThread may die from recoverable UnknownProducerId exception

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.0
    • Fix Version/s: 2.4.1
    • Component/s: streams
    • Labels:
      None

      Description

      We attempted to capture and recover from UnknownProducerId exceptions in KAFKA-9231 , but the exception can still be raised, wrapped in a KafkaException, and kill the thread.

      For example, see the stack trace:

      [2019-12-17 00:08:53,064] ERROR [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] stream-thread [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread)
        org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031, topic=windowed-node-counts, partition=1, offset=354933575, stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (timestamp 1575857317197) to topic stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
      	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
      	at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
      	at org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
      	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
      	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
      	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
      	at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
      	at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
      	at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
      	at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
      	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
      	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
      	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
      	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
      	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
       Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
      	at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
      	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
      	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
      	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
      	... 29 more
       Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
      
      	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
      	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
      	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
       Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (timestamp 1575857317197) to topic stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
      	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
      	at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
      	at org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
      	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
      	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
      	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
      	at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
      	at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
      	at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
      	at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
      	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
      	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
      	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
      	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
      	... 5 more
       Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
      	at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
      	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
      	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
      	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
      	... 29 more
       Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
       [2019-12-17 00:08:53,066] INFO [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] stream-thread [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
      
      

      The catch blocks should be updated to expect the exception in this form.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                vvcephei John Roesler
                Reporter:
                vvcephei John Roesler
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: