Details
Description
When restoring the state on an in-memory KeyValue store (at startup of the Kafka Streams application), the deleted values are put in the store as key with value null instead of being removed from the store.
(this happens when the underlying kafka topic segment did not get compacted yet)
After some digging I came across this in InMemoryKeyValueStore<K, V>:
public synchronized void put(K key, V value) { this.map.put(key, value); }
I would assume this implementation misses the check on value being null to delete the entry instead of just storing it.
In the RocksDB implementation it is done correctly:
if (rawValue == null) { try { db.delete(wOptions, rawKey);
Attachments
Issue Links
- links to