Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1363

Create valid offset file when restoring state store.


    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • 0.15.0
    • None
    • None

    Description

      TaskCoordinator.commit (or taskCoordinator.shutdown) creates offset file for local store in samza.


      StateStoreGC (SAMZA-1158) feature is introduced in samza-0.12(which periodically deletes stale local store).

      We found a race condition between StateStoreGC and samza container (in startup phase) in LinkedIn.

      Here’s an investigation detailing the events(All these events happen in the host: hostname_masked_1.linkedin.com)

      LocalStoreMonitor.log:

      2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Job: jobName:waterloo-databus-isb-adapter jobId:i002 has the running status: stopped with preferred host: hostname_masked_2.linkedin.com.
      2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Store isb-state-store not used by the task: Partition 231.
      2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Deleting the task store: /export/content/data/samsa-yarn/samza-logged-stores/waterloo-databus-isb-adapter-i002/isb-state-store/Partition_231, since it has no offset file.
      
      

      StateStoreGC observes three things:

      • Job is stopped currently.
      • Preferred host for the task is not localhost.
      • There’s no offset file associated with the local store(Local store with no offset file is stale and available for deletion).

      However, at the same time, job samza-job-1 gets scheduled to run on hostname_masked_1.linkedin.com .

      SamzaContainer.log (Container belongs to samza job: waterloo-databus-isb-adapter-i002)

      2017-07-10 17:47:31 BrokerProxy [INFO] Creating new SimpleConsumer for host lva1-app17981.prod.linkedin.com:10251 for system samzametadatasystem
      2017-07-10 17:47:31 GetOffset [INFO] Validating offset 0 for topic and partition [samza-job-1-002-dedup-state-store,231]
      2017-07-10 17:47:31 GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [samza-job-1-002-dedup-state-store,231]. Using it to instantiate consumer.
      2017-07-10 17:47:31 BrokerProxy [INFO] Starting BrokerProxy for hostname_masked.linkedin.com:10251
      2017-07-10 17:47:39 KeyValueStorageEngine [INFO] 1000000 entries restored...
      2017-07-10 17:47:40 SamzaContainer [ERROR] Caught exception/error in process loop.
      org.rocksdb.RocksDBException: 
          at org.rocksdb.RocksDB.delete(Native Method)
          at org.rocksdb.RocksDB.delete(RocksDB.java:1142)
          at org.apache.samza.storage.kv.RocksDbKeyValueStore.putAll(RocksDbKeyValueStore.scala:161)
          at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:111)
          at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:104)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:104)
          at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:263)
          at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:257)
          at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
          at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
          at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
          at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:257)
          at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:88)
          at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:107)
          at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:846)
          at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:844)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      

      

      When StateStoreGC proceeds to cleanup the store, the global state has moved on.

      Since all three cleanup conditions were met, StateStoreGC deletes local store thereby failing the samza container.

      Attachments

        Activity

          People

            spvenkat Shanthoosh Venkataraman
            spvenkat Shanthoosh Venkataraman
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: