Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-169

Task initialization fails if changelog stream does not already exist

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.6.0
    • 0.7.0
    • kafka, kv
    • None

    Description

      The first time you run a job that uses state with changelog (such as in SAMZA-152), you get the following exception:

      2014-03-04 18:15:20 SamzaContainer [ERROR] Caught exception in process loop.
      org.apache.samza.SamzaException: Missing a change log offset for SystemStreamPartition [partition=Partition [partition=1], system=kafka, stream=wikipedia-stats-changelog].
      	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:84)
      	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3$$anonfun$1.apply(TaskStorageManager.scala:84)
      	at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
      	at scala.collection.AbstractMap.getOrElse(Map.scala:58)
      	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:84)
      	at org.apache.samza.storage.TaskStorageManager$$anonfun$startConsumers$3.apply(TaskStorageManager.scala:81)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
      	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      	at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
      	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
      	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      	at org.apache.samza.storage.TaskStorageManager.startConsumers(TaskStorageManager.scala:81)
      	at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:60)
      	at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:103)
      	at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:579)
      	at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:579)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
      	at org.apache.samza.container.SamzaContainer.startStores(SamzaContainer.scala:579)
      	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504)
      	at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81)
      	at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
      

      If debug logging is turned on, the following telltale line appears in the log earlier:

      2014-03-04 18:15:20 KafkaSystemAdmin [DEBUG] Got metadata for streams: Map(wikipedia-stats-changelog -> {TopicMetadata for topic wikipedia-stats-changelog -> 
      No partition metadata for topic wikipedia-stats-changelog due to kafka.common.LeaderNotAvailableException})
      

      Full log: https://gist.github.com/ept/1fecad1b2d79797990a8

      Attachments

        1. SAMZA-169.patch
          8 kB
          Martin Kleppmann

        Issue Links

          Activity

            People

              martinkl Martin Kleppmann
              martinkl Martin Kleppmann
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: