Description
If there is a failure in SamzaContainer, and state is configured with LevelDB, we see this exception:
Exception in thread "main" org.fusesource.leveldbjni.internal.NativeDB$DBException: IO error: /mnt/u002/samsa-yarn/usercache/samza-perf-playground/appcache/application_1399672625491_0002/container_1399672625491_0002_01_000611/state/test-store-all-calls/47/LOCK: No such file or directory at org.fusesource.leveldbjni.internal.NativeDB.checkStatus(NativeDB.java:200) at org.fusesource.leveldbjni.internal.NativeDB.open(NativeDB.java:218) at org.fusesource.leveldbjni.JniDBFactory.open(JniDBFactory.java:168) at org.apache.samza.storage.kv.LevelDbKeyValueStore.db$lzycompute(LevelDbKeyValueStore.scala:71) at org.apache.samza.storage.kv.LevelDbKeyValueStore.db(LevelDbKeyValueStore.scala:71) at org.apache.samza.storage.kv.LevelDbKeyValueStore.putAll(LevelDbKeyValueStore.scala:99) at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:58) at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:161) at org.apache.samza.storage.kv.NullSafeKeyValueStore.flush(NullSafeKeyValueStore.scala:68) at org.apache.samza.storage.kv.KeyValueStorageEngine.flush(KeyValueStorageEngine.scala:117) at org.apache.samza.storage.kv.KeyValueStorageEngine.close(KeyValueStorageEngine.scala:129) at org.apache.samza.storage.kv.KeyValueStorageEngine.stop(KeyValueStorageEngine.scala:123) at org.apache.samza.storage.TaskStorageManager$$anonfun$stop$2.apply(TaskStorageManager.scala:129) at org.apache.samza.storage.TaskStorageManager$$anonfun$stop$2.apply(TaskStorageManager.scala:129) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.storage.TaskStorageManager.stop(TaskStorageManager.scala:129) at org.apache.samza.container.TaskInstance.shutdownStores(TaskInstance.scala:236) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownStores$2.apply(SamzaContainer.scala:660) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownStores$2.apply(SamzaContainer.scala:660) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.shutdownStores(SamzaContainer.scala:660) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:524) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:82) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
I think that this is because SamzaContainer catches the thrown exception, and then calls stop on everything. It seems that calling stop on a not-fully-initialized LevelDB JNI store can throw this exception. We should either wrap LevelDB's stop logic in a try/catch, or change the logic to check that shutting down LevelDB is safe.