Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
TaskCoordinator.commit (or taskCoordinator.shutdown) creates offset file for local store in samza.
StateStoreGC (SAMZA-1158) feature is introduced in samza-0.12(which periodically deletes stale local store).
We found a race condition between StateStoreGC and samza container (in startup phase) in LinkedIn.
Here’s an investigation detailing the events(All these events happen in the host: hostname_masked_1.linkedin.com)
LocalStoreMonitor.log:
2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Job: jobName:waterloo-databus-isb-adapter jobId:i002 has the running status: stopped with preferred host: hostname_masked_2.linkedin.com. 2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Store isb-state-store not used by the task: Partition 231. 2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Deleting the task store: /export/content/data/samsa-yarn/samza-logged-stores/waterloo-databus-isb-adapter-i002/isb-state-store/Partition_231, since it has no offset file.
StateStoreGC observes three things:
- Job is stopped currently.
- Preferred host for the task is not localhost.
- There’s no offset file associated with the local store(Local store with no offset file is stale and available for deletion).
However, at the same time, job samza-job-1 gets scheduled to run on hostname_masked_1.linkedin.com .
SamzaContainer.log (Container belongs to samza job: waterloo-databus-isb-adapter-i002)
2017-07-10 17:47:31 BrokerProxy [INFO] Creating new SimpleConsumer for host lva1-app17981.prod.linkedin.com:10251 for system samzametadatasystem 2017-07-10 17:47:31 GetOffset [INFO] Validating offset 0 for topic and partition [samza-job-1-002-dedup-state-store,231] 2017-07-10 17:47:31 GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [samza-job-1-002-dedup-state-store,231]. Using it to instantiate consumer. 2017-07-10 17:47:31 BrokerProxy [INFO] Starting BrokerProxy for hostname_masked.linkedin.com:10251 2017-07-10 17:47:39 KeyValueStorageEngine [INFO] 1000000 entries restored... 2017-07-10 17:47:40 SamzaContainer [ERROR] Caught exception/error in process loop. org.rocksdb.RocksDBException: at org.rocksdb.RocksDB.delete(Native Method) at org.rocksdb.RocksDB.delete(RocksDB.java:1142) at org.apache.samza.storage.kv.RocksDbKeyValueStore.putAll(RocksDbKeyValueStore.scala:161) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:111) at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:104) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:104) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:263) at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:257) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:257) at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:88) at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:107) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:846) at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:844) at scala.collection.Iterator$class.foreach(Iterator.scala:727)
When StateStoreGC proceeds to cleanup the store, the global state has moved on.
Since all three cleanup conditions were met, StateStoreGC deletes local store thereby failing the samza container.