Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-662

Samza auto-creates changelog stream without sufficient partitions when container number > 1

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: 0.9.1
    • Component/s: container
    • Labels:
      None

      Description

      We currently provide auto-create for changelog streams. However, when there are more than 1 containers, it is possible that Samza creates a changelog stream with insufficient partitions.

      Reason:
      assume we have an input stream with 3 partitions and then we assign 3 containers for this job. According to the JobCoordinator, we will get:

      Container(Model) InputStream Partition Changelog Partition
      0 0 0
      1 1 1
      2 2 2

      If Container 0 is brought up first, it calls

          val maxChangeLogStreamPartitions = containerModel.getTasks.values
                  .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
                  .getChangelogPartition.getPartitionId + 1
      

      in SamzaContainer.

      The maxChangeLogStreamPartition is 1. So we will auto-create a changelog stream with only 1 partitions.

      Similarly, if the Container 2 is brought up first, we will get a stream with 2 partitions.

      1. SAMZA-662-0.9.1-branch.patch
        3 kB
        Jakob Homan
      2. SAMZA-662.v1.patch
        3 kB
        Guozhang Wang

        Issue Links

          Activity

          Hide
          samstokes Sam Stokes added a comment -

          Hi Yi Pan (Data Infrastructure) that explains it - I am on Samza 0.8 (which I know is fairly out of date). Apologies for the stale report.

          Show
          samstokes Sam Stokes added a comment - Hi Yi Pan (Data Infrastructure) that explains it - I am on Samza 0.8 (which I know is fairly out of date). Apologies for the stale report.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Sam Stokes, from Samza 0.9, the changelog topics are set to logcompact topic by default. The code has not been changed since then. In which version did you see that the changelog is auto-created w/o the logcompact option? I would recommend to try the Samza 0.10 release candidate as well to see whether you still see that the changelog is auto-created w/ logcompact option. If not, please file a JIRA ticket. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Sam Stokes , from Samza 0.9, the changelog topics are set to logcompact topic by default. The code has not been changed since then. In which version did you see that the changelog is auto-created w/o the logcompact option? I would recommend to try the Samza 0.10 release candidate as well to see whether you still see that the changelog is auto-created w/ logcompact option. If not, please file a JIRA ticket. Thanks!
          Hide
          samstokes Sam Stokes added a comment -

          I have run into a related bug with changelog topic autocreation, and I'm wondering whether this patch would fix it also. I have found that the changelog topics get autocreated without compaction enabled (i.e. presumably according to the broker's default log cleanup settings).

          This leads to three problems:

          • on task restart, it takes a very long time to replay the changelog before processing any messages
          • the changelog topic grows much faster than expected
          • potentially the changelog topic could have older messages deleted once the retention window is reached, which is not the expected semantics

          It seems like compaction should always be enabled for changelog topics, and so Samza should set compaction when autocreating them. Do you think that will be the case after this patch, or should I open a separate JIRA?

          Show
          samstokes Sam Stokes added a comment - I have run into a related bug with changelog topic autocreation, and I'm wondering whether this patch would fix it also. I have found that the changelog topics get autocreated without compaction enabled (i.e. presumably according to the broker's default log cleanup settings). This leads to three problems: on task restart, it takes a very long time to replay the changelog before processing any messages the changelog topic grows much faster than expected potentially the changelog topic could have older messages deleted once the retention window is reached, which is not the expected semantics It seems like compaction should always be enabled for changelog topics, and so Samza should set compaction when autocreating them. Do you think that will be the case after this patch, or should I open a separate JIRA?
          Hide
          drcrallen Charles Allen added a comment -

          nevermind I see that it is the only patch in 9.1

          Show
          drcrallen Charles Allen added a comment - nevermind I see that it is the only patch in 9.1
          Hide
          drcrallen Charles Allen added a comment -

          Patch for 0.9.0 would be much appreciated

          Show
          drcrallen Charles Allen added a comment - Patch for 0.9.0 would be much appreciated
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          +1. Merged and committed to 0.9.1 branch.

          Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - +1. Merged and committed to 0.9.1 branch. Thanks!
          Hide
          guozhang Guozhang Wang added a comment -

          +1 on the patch for 0.9.0, LGTM.

          Show
          guozhang Guozhang Wang added a comment - +1 on the patch for 0.9.0, LGTM.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Agree to port to 0.9.1 as well. I will download and test the patch today.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Agree to port to 0.9.1 as well. I will download and test the patch today.
          Hide
          jghoman Jakob Homan added a comment -

          Re-opening for 0.9.0 branch.

          Show
          jghoman Jakob Homan added a comment - Re-opening for 0.9.0 branch.
          Hide
          jghoman Jakob Homan added a comment -

          This is a rather large bug and is affecting users on the 0.9.0 release (SAMZA-685). We should, in general, provide a patch for bugs back to the affected branch in order to make it easier for those on that branch to get the fix quickly and to minimize the work needed to create a bug-fix from that release.

          Attaching a patch that applies to 0.9.0. Did not apply cleanly, so is not the original patch, but there were no significant changes to make it apply. Someone else should still review the patch.

          It's unfortunate there's no test for this fix, which would make it easier to be certain there's no interbranch funkiness with the new patch. Opened SAMZA-686 to address this, but we don't need to wait for a test.

          It'd be good to get started on the 0.9.1 release as soon as possible.

          Show
          jghoman Jakob Homan added a comment - This is a rather large bug and is affecting users on the 0.9.0 release ( SAMZA-685 ). We should, in general, provide a patch for bugs back to the affected branch in order to make it easier for those on that branch to get the fix quickly and to minimize the work needed to create a bug-fix from that release. Attaching a patch that applies to 0.9.0. Did not apply cleanly, so is not the original patch, but there were no significant changes to make it apply. Someone else should still review the patch. It's unfortunate there's no test for this fix, which would make it easier to be certain there's no interbranch funkiness with the new patch. Opened SAMZA-686 to address this, but we don't need to wait for a test. It'd be good to get started on the 0.9.1 release as soon as possible.
          Hide
          closeuris Yan Fang added a comment -

          Committed. Thanks!

          Show
          closeuris Yan Fang added a comment - Committed. Thanks!
          Hide
          guozhang Guozhang Wang added a comment -

          Yan Fang attached. Thanks!

          Show
          guozhang Guozhang Wang added a comment - Yan Fang attached. Thanks!
          Hide
          closeuris Yan Fang added a comment -

          +1 for the patch. Could you upload the patch here? Thank you.

          Show
          closeuris Yan Fang added a comment - +1 for the patch. Could you upload the patch here? Thank you.
          Hide
          guozhang Guozhang Wang added a comment -
          Show
          guozhang Guozhang Wang added a comment - Created https://reviews.apache.org/r/34066/
          Hide
          closeuris Yan Fang added a comment -

          agreed.

          Show
          closeuris Yan Fang added a comment - agreed.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          So, won't it make more sense to put this method inside JobModel?

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - So, won't it make more sense to put this method inside JobModel?
          Hide
          closeuris Yan Fang added a comment -

          Yes. agreed. It's easier and cleaner to get it from the model, instead of calculating it.

          Show
          closeuris Yan Fang added a comment - Yes. agreed. It's easier and cleaner to get it from the model, instead of calculating it.
          Hide
          navina Navina Ramesh added a comment -

          Yeah. You are right.

          I assumed that containerModel encompasses the information about all containers (like a map). +1 for the bug

          Guess the maxChangelogPartition should be part of one of the models in the hierarchy rather than being computed by each container.

          Show
          navina Navina Ramesh added a comment - Yeah. You are right. I assumed that containerModel encompasses the information about all containers (like a map). +1 for the bug Guess the maxChangelogPartition should be part of one of the models in the hierarchy rather than being computed by each container.
          Hide
          closeuris Yan Fang added a comment - - edited

          I think one thing you may miss is that, different containers get different containerModels.

          Show
          closeuris Yan Fang added a comment - - edited I think one thing you may miss is that, different containers get different containerModels.
          Hide
          closeuris Yan Fang added a comment -

          It is 1 because in

           val maxChangeLogStreamPartitions = containerModel.getTasks.values
                      .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
                      .getChangelogPartition.getPartitionId + 1
          

          ,

          the containerModel only contains the TaskModels assigned to it. So if the containerModel only gets assigned, say, TaskModel 1, it will only have TaskModel 1 in

          .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId })
          

          ,
          then getChangelogPartition.getPartitionId only returns 0, so the maxChangeLogStreamPartitions is 0+1 = 1.

          Show
          closeuris Yan Fang added a comment - It is 1 because in val maxChangeLogStreamPartitions = containerModel.getTasks.values .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId }) .getChangelogPartition.getPartitionId + 1 , the containerModel only contains the TaskModels assigned to it. So if the containerModel only gets assigned, say, TaskModel 1, it will only have TaskModel 1 in .max(Ordering.by { task:TaskModel => task.getChangelogPartition.getPartitionId }) , then getChangelogPartition.getPartitionId only returns 0, so the maxChangeLogStreamPartitions is 0+1 = 1.
          Hide
          navina Navina Ramesh added a comment -

          The maxChangeLogStreamPartition is 1.

          Based on your example, isn't maxChangeLogStreamPartition = 3?

          containerModel.getTasks.values.max(Ordering.by { task: TaskModel => task.getChangelogPartition.getPartitionId }) 
          

          should return TaskModel (containerId=2, inputStreamPartition=2, changelogPartition=2). Hence, maxChangeLogStreamPartitions should be 3.

          The order in which the containers are brought shouldn't matter as long as the ContainerModel remains the same. Please correct me if I have misunderstood something here.

          Show
          navina Navina Ramesh added a comment - The maxChangeLogStreamPartition is 1. Based on your example, isn't maxChangeLogStreamPartition = 3? containerModel.getTasks.values.max(Ordering.by { task: TaskModel => task.getChangelogPartition.getPartitionId }) should return TaskModel (containerId=2, inputStreamPartition=2, changelogPartition=2). Hence, maxChangeLogStreamPartitions should be 3. The order in which the containers are brought shouldn't matter as long as the ContainerModel remains the same. Please correct me if I have misunderstood something here.

            People

            • Assignee:
              guozhang Guozhang Wang
              Reporter:
              closeuris Yan Fang
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development