Description
A common usage pattern with Samza stores is to periodically clear out the DB. Right now, we call store.all(), iterate over every key, and delete it.
While using this style of processing, we noticed that the store.all call gets increasingly slow as we delete more keys from it:
This code runs in 5 seconds:
@Test def testPerformance { System.out.println("Performance test for store: %s" format typeOfStore) val numLoops = 100 val messagesPerBatch = 10000 val stuff = b((0 until 200).map(i => "a").mkString) val start = System.currentTimeMillis (0 until numLoops).foreach(i => { System.out.println("(%sms) Total in store: %s" format (System.currentTimeMillis - start, i * messagesPerBatch)) (0 until messagesPerBatch).foreach(j => { val k = b((i * j).toString) store.put(k, stuff) // store.delete(k) }) val allStart = System.currentTimeMillis val iter = store.all System.out.println("(%s) all() took %sms." format (System.currentTimeMillis - start, System.currentTimeMillis - allStart)) iter.close }) System.out.println("Total time: %ss" format ((System.currentTimeMillis - start) * .001)) }
Prints:
Performance test for store: cache (2ms) Total in store: 0 (627) all() took 5ms. (628ms) Total in store: 10000 (767) all() took 0ms. (767ms) Total in store: 20000 (848) all() took 0ms. (848ms) Total in store: 30000 (915) all() took 0ms. (916ms) Total in store: 40000 (982) all() took 0ms. (982ms) Total in store: 50000 (1046) all() took 0ms. (1046ms) Total in store: 60000 (1100) all() took 0ms. (1101ms) Total in store: 70000 .... (5360ms) Total in store: 940000 (5406) all() took 1ms. (5406ms) Total in store: 950000 (5447) all() took 0ms. (5447ms) Total in store: 960000 (5499) all() took 0ms. (5499ms) Total in store: 970000 (5558) all() took 0ms. (5558ms) Total in store: 980000 (5605) all() took 0ms. (5605ms) Total in store: 990000 (5654) all() took 1ms. Total time: 5.654s
Identical code, but with the store.delete enabled prints:
Performance test for store: cache (2ms) Total in store: 0 (602) all() took 6ms. (602ms) Total in store: 10000 (744) all() took 1ms. (745ms) Total in store: 20000 (820) all() took 2ms. (820ms) Total in store: 30000 (894) all() took 2ms. (895ms) Total in store: 40000 (952) all() took 3ms. (952ms) Total in store: 50000 (1006) all() took 4ms. (1006ms) Total in store: 60000 (1061) all() took 5ms. (1061ms) Total in store: 70000 (1116) all() took 5ms. (1116ms) Total in store: 80000 .... (9450) all() took 50ms. (9451ms) Total in store: 910000 (9548) all() took 53ms. (9549ms) Total in store: 920000 (9650) all() took 56ms. (9650ms) Total in store: 930000 (9757) all() took 60ms. (9757ms) Total in store: 940000 (9865) all() took 62ms. (9866ms) Total in store: 950000 (9977) all() took 64ms. (9978ms) Total in store: 960000 (10093) all() took 68ms. (10093ms) Total in store: 970000 (10211) all() took 70ms. (10211ms) Total in store: 980000 (10346) all() took 74ms. (10346ms) Total in store: 990000 (10472) all() took 76ms. Total time: 10.472s
The latency is clearly increasing in all() when deletes are added.
I dug down a bit, and it's showing that this line in LevelDbKeyValueStore is the culprit:
iter.seekToFirst()
(10478ms) Total in store: 970000 db.iterator() took 0ms. iter.seekToFirst() took 98ms. (10606) all() took 99ms. (10606ms) Total in store: 980000 db.iterator() took 0ms. iter.seekToFirst() took 65ms. (10699) all() took 65ms. (10699ms) Total in store: 990000 db.iterator() took 0ms. iter.seekToFirst() took 66ms. (10794) all() took 66ms.
More digging shows that this is a common problem with LevelDB:
http://code.google.com/p/leveldb/issues/detail?id=77
So, as I see it, we have a few options:
- Upgrade to RocksDB
- Find a new way to clear DBs
- Trigger compactions to clean up the dead keys in the DB
(1) is definitely the ultimate goal. If (3) is do-able, though I'd like to get it into 0.7.0, since this is a pretty common use case for us. I'm not a fan of (2), because this problem also manifests itself when people don't want to drop the entire table, but delete significant chunks of it periodically (e.g. deleting all expired keys from a join buffer).
Attachments
Attachments
Issue Links
- relates to
-
SAMZA-236 Switch samza-kv to use RocksDB
- Resolved