Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-948

CoordinatorStreamSystemConsumer is not threadsafe.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.10.1
    • Fix Version/s: 0.10.1
    • Component/s: None
    • Labels:
      None

      Description

      In testing the 10.1 release I found some ConcurrentModificationExceptions resulting from the SAMZA-913 patch.

      It appears that the AM UI, onContainerComplete callback, and probably other codepaths can cause concurrent coordinator stream bootstraps, which was always a problem, but now causes a ConcurrentModificationException because bootstrap calls remove() on the bootstrap messages set.

      Here are a couple stack traces illustrating the issue:
      java.util.ConcurrentModificationException
      at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
      at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734)
      at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.getBootstrappedStream(CoordinatorStreamSystemConsumer.java:188)
      at org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.getBootstrappedStream(AbstractCoordinatorStreamManager.java:85)
      at org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:101)
      at org.apache.samza.job.model.JobModel.getContainerToHostValue(JobModel.java:96)
      at org.apache.samza.job.yarn.SamzaTaskManager.onContainerCompleted(SamzaTaskManager.java:210)
      at org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1$$anonfun$apply$5.apply(SamzaAppMaster.scala:143)
      at org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1$$anonfun$apply$5.apply(SamzaAppMaster.scala:143)
      at scala.collection.immutable.List.foreach(List.scala:318)
      at org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1.apply(SamzaAppMaster.scala:143)
      at org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1.apply(SamzaAppMaster.scala:143)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at org.apache.samza.job.yarn.SamzaAppMaster$.onContainersCompleted(SamzaAppMaster.scala:143)
      at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)

      java.util.ConcurrentModificationException

      at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
      at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734)
      at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.getBootstrappedStream(CoordinatorStreamSystemConsumer.java:188)
      at org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.getBootstrappedStream(AbstractCoordinatorStreamManager.java:85)
      at org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:101)
      at org.apache.samza.job.model.JobModel.getContainerToHostValue(JobModel.java:96)
      at views.$scalate$index_scaml$$anonfun$$scalate$render$3.apply(index.scaml.scala:230)
      at views.$scalate$index_scaml$$anonfun$$scalate$render$3.apply(index.scaml.scala:183)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      at views.$scalate$index_scaml.render(index.scaml.scala:331)
      at org.fusesource.scalate.DefaultRenderContext.capture(DefaultRenderContext.scala:92)
      at org.fusesource.scalate.layout.DefaultLayoutStrategy.layout(DefaultLayoutStrategy.scala:45)
      at org.fusesource.scalate.TemplateEngine$$anonfun$layout$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TemplateEngine.scala:559)
      at org.fusesource.scalate.TemplateEngine$$anonfun$layout$1$$anonfun$apply$mcV$sp$1.apply(TemplateEngine.scala:559)
      at org.fusesource.scalate.TemplateEngine$$anonfun$layout$1$$anonfun$apply$mcV$sp$1.apply(TemplateEngine.scala:559)
      at org.fusesource.scalate.RenderContext$class.withUri(RenderContext.scala:447)
      at org.fusesource.scalate.DefaultRenderContext.withUri(DefaultRenderContext.scala:30)
      at org.fusesource.scalate.TemplateEngine$$anonfun$layout$1.apply$mcV$sp(TemplateEngine.scala:558)
      at org.fusesource.scalate.TemplateEngine$$anonfun$layout$1.apply(TemplateEngine.scala:555)
      at org.fusesource.scalate.TemplateEngine$$anonfun$layout$1.apply(TemplateEngine.scala:555)
      at org.fusesource.scalate.RenderContext$.using(RenderContext.scala:47)
      at org.fusesource.scalate.TemplateEngine.layout(TemplateEngine.scala:555)
      at org.fusesource.scalate.TemplateEngine.layout(TemplateEngine.scala:547)
      at org.fusesource.scalate.TemplateEngine.layout(TemplateEngine.scala:601)
      at org.scalatra.scalate.ScalateSupport$class.layoutTemplateAs(ScalateSupport.scala:223)
      at org.apache.samza.webapp.ApplicationMasterWebServlet.layoutTemplateAs(ApplicationMasterWebServlet.scala:31)
      ... [truncated]

        Attachments

        1. SAMZA-948_1.patch
          7 kB
          Jake Maes
        2. SAMZA-948_2.patch
          7 kB
          Jake Maes
        3. SAMZA-948_3.patch
          7 kB
          Jake Maes
        4. SAMZA-948_4.patch
          7 kB
          Jake Maes
        5. SAMZA-948_5.patch
          7 kB
          Jake Maes
        6. SAMZA-948_6.patch
          9 kB
          Jake Maes

          Activity

            People

            • Assignee:
              jmakes Jake Maes
              Reporter:
              jmakes Jake Maes
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: