Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6860

NPE when reinitializeStateStores with eos enabled

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.1.1, 2.0.0
    • Component/s: streams
    • Labels:
      None
    • Environment:
      mac, kafka1.1

      Description

      Symptom
      With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is null.

      2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] Encountered the following error during processing:
      java.lang.NullPointerException: null
              at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) ~[kafka-streams-1.1.0.jar:na]
              at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) ~[kafka-streams-1.1.0.jar:na]
              at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) ~[kafka-streams-1.1.0.jar:na]
              at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) ~[kafka-streams-1.1.0.jar:na]
      

      How to reproduce

      configure as

      • changelog topic with short `retention.ms` and `delete` policy (just to reproduce the symptom easily)
        ex) retention.ms=30000,cleanup.policy=deleteĀ 
      • exaclty once semantic enabled
      • no cleanup

      Step

      • two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], was#2:task[0_1])
      • write some data each state store(changelog topic will soon erase those messages. by short "retentin.ms")
      • when was#2 is killed, then was#1 will restore task[0_1]'s data on its own rocksDB
      • In the process, it finds a checkpoint and an error occurs.(AbstractStateManager #66)
      // My code
      Map<String, String> topicConfiguration = new HashMap<>();
              topicConfiguration.putIfAbsent("cleanup.policy", "delete");
              topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
              topicConfiguration.putIfAbsent("retention.ms", "3000");
      
      builder.stream(properties.getSourceTopic(),
                     Consumed.with(Serdes.Long(), Serdes.String()))
             .groupByKey()
             .count(Materialized
                        .<Long, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_STORE_NAME)
                        .withKeySerde(Serdes.Long())
                        .withValueSerde(Serdes.Long())
                        .withLoggingEnabled(topicConfiguration));
      

      Suggestion

      When EOS is enabled, the checkpoint will be null.
      I think , need to add some code to create a Checkpoint.
      As follows

      // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
      // # suggestion start
      if (checkpoint == null) {
          checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
      }
      // # suggestion end
      
      try {
          checkpoint.write(checkpointableOffsets);
      } catch (final IOException fatalException) {
          log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
       throw new StreamsException("Failed to reinitialize global store.", fatalException);
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mjsax Matthias J. Sax
                Reporter:
                bk.ko ko byoung kwon
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - 2h
                  2h
                  Remaining:
                  Remaining Estimate - 2h
                  2h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified