Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10247

Streams may attempt to process after closing a task

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.6.0
    • 2.6.0
    • streams
    • None

    Description

      Observed in a system test. A corrupted task was detected, and Stream properly closed it as dirty:

      [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
      org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
         at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
         at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. (org.apache.kafka.streams.processor.internals.StreamThread)
      org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be re-initialized
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
         at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
         at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
         ... 3 more
      [2020-07-08 17:08:09,346] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
      [2020-07-08 17:08:09,346] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing its state manager and all the registered state stores: {sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 : SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null, cntStoreName=StateStoreMetadata (cntStoreName : SmokeTest-cntStoreName-changelog-1 @ 0} (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
      [2020-07-08 17:08:09,346] INFO [Consumer clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer, groupId=null] Subscribed to partition(s): SmokeTest-minStoreName-changelog-1, SmokeTest-minStoreName-changelog-2, SmokeTest-sum-STATE-STORE-0000000050-changelog-0, SmokeTest-minStoreName-changelog-3, SmokeTest-sum-STATE-STORE-0000000050-changelog-2, SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0, SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2, SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4 (org.apache.kafka.clients.consumer.KafkaConsumer)
      [2020-07-08 17:08:09,348] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released state dir lock for task 2_1 (org.apache.kafka.streams.processor.internals.StateDirectory)
      [2020-07-08 17:08:09,348] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing record collector dirty (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
      [2020-07-08 17:08:09,348] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask)

      However, there were already records buffered for it, so later on in the same processing loop, Streams tried to process that task, resulting in an IllegalStateException:

      [2020-07-08 17:08:09,352] ERROR stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to process stream task 2_1 due to the following error: (org.apache.kafka.streams.processor.internals.TaskManager)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName is currently closed.
         at org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
         at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
         at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
         at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      [2020-07-08 17:08:09,352] ERROR stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName is currently closed.
         at org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
         at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
         at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
         at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      [2020-07-08 17:08:09,352] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
      [2020-07-08 17:08:09,352] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)

      Which caused the entire thread to shut down.

       

      Instead, we should not attempt to process tasks that are not running.

      Attachments

        Issue Links

          Activity

            People

              vvcephei John Roesler
              vvcephei John Roesler
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: