Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-947

TaskAssignmentManager registration exception when partition count changes.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 0.10.1
    • 0.10.1
    • None
    • None

    Description

      The GroupByPartitionCount grouper deletes the persisted task mapping if the partition count has changed because there may be fewer tasks and that would cause the old mapping to be invalid.

      To delete the mapping, the TaskAssignmentManager registers itself and writes null for all the keys. Later when the recalculated mapping is saved, it tries to reregister itself, which causes this exception:
      Exception in thread "main" org.apache.samza.SamzaException: SamzaTaskAssignmentManager is already registered with the queuing system producer
      at org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65)
      at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72)
      at org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100)
      at org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58)
      at org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179)
      at org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93)
      at org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255)
      at org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187)
      at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193)
      at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120)
      at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104)
      at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74)
      at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)

      In a YARN environment, the AM restarts and since the task mapping has now been deleted, this 2nd attempt to save the mapping succeeds.

      Since this issue only occurs when the partition count changes and is recoverable, I'm marking it as low priority.

      Attachments

        1. SAMZA-947_2.patch
          6 kB
          Jake Maes
        2. SAMZA-947_1.patch
          6 kB
          Jake Maes

        Activity

          People

            jmakes Jake Maes
            jmakes Jake Maes
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: