Description
I have created a very simple kafka-streams application with 1 state store. I'm very surprised that the block-cache-capacity metrics show a 100MB block cache capacity instead of the default one in kafka streams is 50MB.
My topology :
StreamsBuilder sb = new StreamsBuilder();
sb.stream("input")
.groupByKey()
.count()
.toStream()
.to("output");
I checkout the kafka-streams code and I saw a strange thing. When the RocksDBTimestampedStorestore is created, we try to create two column families for backward compatibility with a potentiel old key/value store.
In this method, setDbAccessor(col1, col2) if the first column is not valid, well you close this one (L102). But regarding the rocksdb instance, it's seems that the column families is not deleted completely and the metrics exposed by Rocksdb continue to aggregate (L373) {{block-cache-capacity }}for both column families (default and keyValueWithTimestamp).
Maybe you have to drop explicitly the column family, in the setDbAccessor(col1, col2) if the first column is not valid (like db.dropColumnFamily(noTimestampColumnFamily);)
I tried to drop the noTimestampColumnFamily in setDbAccessor if the first column is not valid like :
private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, final ColumnFamilyHandle withTimestampColumnFamily) throws RocksDBException { final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily); noTimestampsIter.seekToFirst(); if (noTimestampsIter.isValid()) { log.info("Opening store {} in upgrade mode", name); dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily); } else { log.info("Opening store {} in regular mode", name); dbAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily); noTimestampColumnFamily.close(); db.dropColumnFamily(noTimestampColumnFamily); // try fix it } noTimestampsIter.close(); }
But it's seems that you can't drop the default column family in RocksDb (see screenshot).
{{So how can we have the real block-cache-capacity metrics value in Kafka Streams monitoring ? }}