Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-947

TaskAssignmentManager registration exception when partition count changes.

Agile BoardAttach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 0.10.1
    • 0.10.1
    • None
    • None

    Description

      The GroupByPartitionCount grouper deletes the persisted task mapping if the partition count has changed because there may be fewer tasks and that would cause the old mapping to be invalid.

      To delete the mapping, the TaskAssignmentManager registers itself and writes null for all the keys. Later when the recalculated mapping is saved, it tries to reregister itself, which causes this exception:
      Exception in thread "main" org.apache.samza.SamzaException: SamzaTaskAssignmentManager is already registered with the queuing system producer
      at org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65)
      at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72)
      at org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100)
      at org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58)
      at org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179)
      at org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93)
      at org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255)
      at org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187)
      at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193)
      at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120)
      at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104)
      at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74)
      at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)

      In a YARN environment, the AM restarts and since the task mapping has now been deleted, this 2nd attempt to save the mapping succeeds.

      Since this issue only occurs when the partition count changes and is recoverable, I'm marking it as low priority.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jmakes Jake Maes
            jmakes Jake Maes
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment