With EOS enabled , Reinitializing stateStores get an NPE because checkpoint is null.
2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] Encountered the following error during processing:
java.lang.NullPointerException: null
at org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions( ~[kafka-streams-1.1.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions( ~[kafka-streams-1.1.0.jar:na]
at org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions( ~[kafka-streams-1.1.0.jar:na]
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore( ~[kafka-streams-1.1.0.jar:na]
How to reproduce
configure as
- changelog topic with short `` and `delete` policy (just to reproduce the symptom easily)
ex),cleanup.policy=deleteĀ - exaclty once semantic enabled
- no cleanup
- two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], was#2:task[0_1])
- write some data each state store(changelog topic will soon erase those messages. by short "")
- when was#2 is killed, then was#1 will restore task[0_1]'s data on its own rocksDB
- In the process, it finds a checkpoint and an error occurs.(AbstractStateManager #66)
// My code Map<String, String> topicConfiguration = new HashMap<>(); topicConfiguration.putIfAbsent("cleanup.policy", "delete"); topicConfiguration.putIfAbsent("", "0"); topicConfiguration.putIfAbsent("", "3000");, Consumed.with(Serdes.Long(), Serdes.String())) .groupByKey() .count(Materialized .<Long, Long, KeyValueStore<Bytes, byte[]>>as(ORDER_STORE_NAME) .withKeySerde(Serdes.Long()) .withValueSerde(Serdes.Long()) .withLoggingEnabled(topicConfiguration));
When EOS is enabled, the checkpoint will be null.
I think , need to add some code to create a Checkpoint.
As follows
// # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 // # suggestion start if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } // # suggestion end try { checkpoint.write(checkpointableOffsets); } catch (final IOException fatalException) { log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException); throw new StreamsException("Failed to reinitialize global store.", fatalException); }
Issue Links
- links to