Description
Sitting in a tight loop and calling store.all(); store.close(); shows that memory is being leaked.
Digging into it a bit more, when we use a KeyValueStore with a cache, and we call store.all, the cache does:
def all() = { metrics.alls.inc flush() store.all() }
In turn, flush() does:
def flush() { trace("Flushing.") metrics.flushes.inc // write out the contents of the dirty list oldest first val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount) for (k <- this.dirty.reverse) { val entry = this.cache.get(k) entry.dirty = null // not dirty any more batch.add(new Entry(k, entry.value)) } store.putAll(batch) store.flush metrics.flushBatchSize.inc(batch.size) // reset the dirty list this.dirty = new mutable.DoubleLinkedList[K]() this.dirtyCount = 0 }
The store.putAll in this code, calls LevelDbKeyValueStore.putAll, which has:
def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) { val batch = db.createWriteBatch() val iter = entries.iterator var wrote = 0 var deletes = 0 while (iter.hasNext) { wrote += 1 val curr = iter.next() if (curr.getValue == null) { deletes += 1 batch.delete(curr.getKey) } else { val key = curr.getKey val value = curr.getValue metrics.bytesWritten.inc(key.size + value.size) batch.put(key, value) } } db.write(batch) batch.close metrics.puts.inc(wrote) metrics.deletes.inc(deletes) }
According to the docs on https://github.com/fusesource/leveldbjni, the batch needs to be close!
WriteBatch batch = db.createWriteBatch(); try { batch.delete(bytes("Denver")); batch.put(bytes("Tampa"), bytes("green")); batch.put(bytes("London"), bytes("red")); db.write(batch); } finally { // Make sure you close the batch to avoid resource leaks. batch.close(); }
This should be a one-line fix.