Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7443

OffsetOutOfRangeException in restoring state store from changelog topic when start offset of local checkpoint is smaller than that of changelog topic

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
    • Fix Version/s: 1.1.2, 2.2.0, 2.1.1, 2.0.2
    • Component/s: streams
    • Labels:

      Description

      When restoring local state store from a changelog topic in EOS, kafka stream will sometimes throw out the OffsetOutOfRangeException such as:

      Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.
      org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:

      Unknown macro: {AuditTrailBatch_PROD3-Dedup-key-store-changelog-32=75465112}

      at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:950)
      at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:470)
      at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1249)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1157)
      at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:89)
      at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:765)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:734) 

      This scenario occurs when changelog topic deleted the expired log segments according to the retention.ms, but the start offset in the local .checkpoint file is the position when the task last exits from this instance, which may be smaller than the updated beginning offset of changelog topic. Restoring store from start offset in checkpoint file will throw exception.

      It can be reproduced as below (Kafka Stream runs in EOS):

      1. task for topic partition test-1 is running on instance A. When task exits, kafka stream writes the last committed offset 100 for test-1 in checkpoint file.
      2. task test-1 transfer to instance B.
      3. During this time, the remote changelog topic for test-1 updates its start offset to 120 as the old log segment reaches retention time and is deleted.
      4. After a while, task test-1 exits from instance B and resumes on instance A, and task restores local state store of A from checkpoint offset 100, which is smaller than the valid offset 120 of changelog topic. Such exception throws out.

      When this exception occurs, kafka stream tries to reinitialize the task and intends to restore from beginning in catch block below. Unfortunately, this handle not work and the task keeps throwing  OffsetOutOfRangeException in the following restoring processes.

      //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
      //handle for OffsetOutOfRangeException in kafka stream
      
      catch (final InvalidOffsetException recoverableException) {
       log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
       final Set<TopicPartition> partitions = recoverableException.partitions();
       for (final TopicPartition partition : partitions) {
         final StreamTask task = active.restoringTaskFor(partition);
         log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
      
         needsInitializing.remove(partition);
         needsRestoring.remove(partition);
      
         task.reinitializeStateStoresForPartitions(recoverableException.partitions());
       }
       restoreConsumer.seekToBeginning(partitions);
      }

       

       Investigate why the handle for this exception not work, I found the root cause:

       Kafka stream registered state restorers in the variable stateRestorers, which is used to read /update the start and end offset for restoring local state store.

      //org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
      
      private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();

      When the OffsetOutOfRangeException occurs, kafka stream should updates the checkpoint offset in the state restorer of this topic partition to "NO_CHECKPOINT" state, and the next time, task can restore from the beginning offset of remote changelog topic and resolve this issue.

      But in catch block above, task.reinitializeStateStoresForPartitions(recoverableException.partitions()) not actually updates the checkpoint offset in stateRestorers, so the next time it still resumes from the original invalid offset and stuck in this exception.

      I make some fix for this bug, by updating the checkpoint offset for this stateRestorer. and I validated it works for this issue. The modified code is as below

      catch (final InvalidOffsetException recoverableException) {
       log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
       final Set<TopicPartition> partitions = recoverableException.partitions();
       for (final TopicPartition partition : partitions) {
         final StreamTask task = active.restoringTaskFor(partition);
         log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
      
         needsInitializing.remove(partition);
         needsRestoring.remove(partition);
      
         //add by linyli
         final StateRestorer restorer = stateRestorers.get(partition);
         restorer.setCheckpointOffset(StateRestorer.NO_CHECKPOINT);
      
         task.reinitializeStateStoresForPartitions(recoverableException.partitions());
       }
       restoreConsumer.seekToBeginning(partitions);
      }

       Any comments are welcomed for this issue.

        Attachments

        1. KAFKA-7443.url
          0.1 kB
          linyue li

          Issue Links

            Activity

              People

              • Assignee:
                linyli linyue li
                Reporter:
                linyli linyue li
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: