Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-567

Can't interact with KV store from InitiableTask.init()

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.9.0
    • Component/s: container
    • Labels:
      None

      Description

      Attempting to interact with the KeyValueStore from InitiableTask.init() results in a rather obscure exception:

      java.util.NoSuchElementException: key not found: TaskName-Partition 3
      at scala.collection.MapLike$class.default(MapLike.scala:228) ~[scala-library-2.10.1.jar:na]
      at scala.collection.AbstractMap.default(Map.scala:58) ~[scala-library-2.10.1.jar:na]
      at scala.collection.mutable.HashMap.apply(HashMap.scala:64) ~[scala-library-2.10.1.jar:na]
      at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) ~[samza-core_2.10-0.8.0.jar:na]
      at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) ~[samza-core_2.10-0.8.0.jar:na]
      at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) ~[samza-kv_2.10-0.8.0.jar:na]
      at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57) ~[samza-kv_2.10-0.8.0.jar:na]
      at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) ~[samza-kv_2.10-0.8.0.jar:na]
      at org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69) ~[samza-kv_2.10-0.8.0.jar:na]
      at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) ~[na:1.8.0_25]
      at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
      at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
      at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) ~[samza-kv_2.10-0.8.0.jar:na]
      at org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) ~[samza-kv_2.10-0.8.0.jar:na]
      at org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) ~[samza-kv_2.10-0.8.0.jar:na]
      ...

      After some investigation I see that it's actually not safe to do anything that is going to potentially produce messages from init(), because startTask is called before startProducers in SamzaContainer.run. Interaction with the KV store results in writes to the changelog, resulting in the above exception. Conceptually, it seems like the producers should be initialized first to prevent this, but I have no idea what the side-effects of doing that would be. Minimally, I'd like to see this behavior documented and a more obvious failure such as an IllegalStateException.

      Discussion that precipitated this issue:
      http://mail-archives.apache.org/mod_mbox/samza-dev/201502.mbox/%3c962D3CAB94174A4E9B771B88A9DFE7B10F1E15E2@SJEXMB02.Tivo.com%3e

        Attachments

        1. 0001-SAMZA-567.patch
          1 kB
          Naveen

          Issue Links

            Activity

              People

              • Assignee:
                naveenatceg Naveen Somasundaram
                Reporter:
                twbecker Tommy Becker
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: