Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1728

BootstrappingChooser: Call checkOffset only for a lagging partition while choosing.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.0
    • None
    • None

    Description

      We seem to be calling checkOffset even for the ssps that have finished bootstrapping, resulting in decrementing systemStreamLagCounts but not laggingSystemStreamPartitions as that ssp has already been removed from the set. This results in the systemStream removed from systemStreamLagCounts while there are still few lagging ssps for that system stream.

      if (comparatorResult != null && comparatorResult.intValue() >= 0) {
        laggingSystemStreamPartitions -= systemStreamPartition
        systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)

        if (systemStreamLagCounts(systemStream) == 0)

      {     // If the lag count is 0, then no partition for this stream is lagging     // (the stream has been fully caught up).     systemStreamLagCounts -= systemStream   }

       

      This results in the following exception:

      java.util.NoSuchElementException: key not found: SystemStream [system=brooklin-espresso, stream=SampleBrooklinFunctionsIdentityProfileDS]
      at scala.collection.MapLike$class.default(MapLike.scala:228)
      at scala.collection.AbstractMap.default(Map.scala:58)
      at scala.collection.MapLike$class.apply(MapLike.scala:141)
      at scala.collection.AbstractMap.apply(Map.scala:58)
      at org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:281)
      at org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
      at org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
      at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
      at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
      at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
      at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:753)
      at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
      at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)

      Attachments

        Issue Links

          Activity

            People

              atoomula Aditya Toomula
              atoomula Aditya Toomula
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: