Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10247

Streams may attempt to process after closing a task

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.6.0
    • Fix Version/s: 2.6.0
    • Component/s: streams
    • Labels:
      None

      Description

      Observed in a system test. A corrupted task was detected, and Stream properly closed it as dirty:

      [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it is likely that the consumer's position has fallen out of the topic partition offset range because the topic was truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing it later. (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
      org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
         at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
         at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      [2020-07-08 17:08:09,345] WARN stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. (org.apache.kafka.streams.processor.internals.StreamThread)
      org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be re-initialized
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
         at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
         at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
         ... 3 more
      [2020-07-08 17:08:09,346] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
      [2020-07-08 17:08:09,346] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing its state manager and all the registered state stores: {sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 : SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null, cntStoreName=StateStoreMetadata (cntStoreName : SmokeTest-cntStoreName-changelog-1 @ 0} (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
      [2020-07-08 17:08:09,346] INFO [Consumer clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer, groupId=null] Subscribed to partition(s): SmokeTest-minStoreName-changelog-1, SmokeTest-minStoreName-changelog-2, SmokeTest-sum-STATE-STORE-0000000050-changelog-0, SmokeTest-minStoreName-changelog-3, SmokeTest-sum-STATE-STORE-0000000050-changelog-2, SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0, SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2, SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4 (org.apache.kafka.clients.consumer.KafkaConsumer)
      [2020-07-08 17:08:09,348] DEBUG stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released state dir lock for task 2_1 (org.apache.kafka.streams.processor.internals.StateDirectory)
      [2020-07-08 17:08:09,348] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closing record collector dirty (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
      [2020-07-08 17:08:09,348] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask)

      However, there were already records buffered for it, so later on in the same processing loop, Streams tried to process that task, resulting in an IllegalStateException:

      [2020-07-08 17:08:09,352] ERROR stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to process stream task 2_1 due to the following error: (org.apache.kafka.streams.processor.internals.TaskManager)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName is currently closed.
         at org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
         at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
         at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
         at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      [2020-07-08 17:08:09,352] ERROR stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
      org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName is currently closed.
         at org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
         at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
         at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
         at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
         at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
         at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
         at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
      [2020-07-08 17:08:09,352] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
      [2020-07-08 17:08:09,352] INFO stream-thread [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)

      Which caused the entire thread to shut down.

       

      Instead, we should not attempt to process tasks that are not running.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                vvcephei John Roesler
                Reporter:
                vvcephei John Roesler
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: