Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-41

Support static partition assignment in LocalJobFactory

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.10.1
    • Component/s: container
    • Labels:

      Description

      LocalJobFactory currently creates a single container (either in ProcessJob or ThreadJob) and assigns all partitions to it using:

      val partitions = Util.getMaxInputStreamPartitions(config)
      

      This works in the case where you only wish to run a single container that processes all messages. There are situations where one container is not enough, though. If you aren't using YARN, we don't provide an easy way to run multiple containers that split partitions between them. This support would be useful for running containers in EC2, for example, where you'd wish to run two EC2 instances (for example) that host Samza containers that share partitions for a single job.

      Some potential solutions:

      1. Let developers statically assign partitions in config file.
      2. Let developers define a container ID and container count, and let LocalJobFactory/ProcessJob/ThreadJob figure out which partitions the container should own. For example, a container with id 0 and container count 2 would own partitions 0, 2, 4, 6, 8, etc.
      3. Write a different JobFactory for this case (e.g. EC2JobFactory)

      1. rb48808.patch
        32 kB
        Yi Pan (Data Infrastructure)
      2. SAMZA-41.0-updated.patch
        68 kB
        Jose
      3. samza-41-design-doc.md
        2 kB
        Monal Daxini
      4. samza-41-design-doc.pdf
        33 kB
        Monal Daxini
      5. samza41-range.patch
        23 kB
        Monal Daxini
      6. samza41-regex.patch
        21 kB
        Monal Daxini

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          This is also useful in the case where you're running a job locally, but want to make the container look like a remote container that was part of a YARN cluster. In such a case, the YARN container will have a subset of partitions that it's consuming from. Allowing us to statically assign the same partitions to a local container makes it easy to debug things.

          Show
          criccomini Chris Riccomini added a comment - This is also useful in the case where you're running a job locally, but want to make the container look like a remote container that was part of a YARN cluster. In such a case, the YARN container will have a subset of partitions that it's consuming from. Allowing us to statically assign the same partitions to a local container makes it easy to debug things.
          Hide
          theduderog Roger Hoover added a comment -

          I think it would be highly valuable to able to run Samza jobs with multiple containers outside of YARN. For some environments, static configuration is sufficient. YARN adds unnecessary complexity.

          As Chris Riccomini mentioned, this would be very helpful for debugging.

          I haven't thought deeply about it but at first pass, I think static partition assignment would be fine. They could be passed as command line parameters even.

          Show
          theduderog Roger Hoover added a comment - I think it would be highly valuable to able to run Samza jobs with multiple containers outside of YARN. For some environments, static configuration is sufficient. YARN adds unnecessary complexity. As Chris Riccomini mentioned, this would be very helpful for debugging. I haven't thought deeply about it but at first pass, I think static partition assignment would be fine. They could be passed as command line parameters even.
          Hide
          mdaxini Monal Daxini added a comment -

          I would like to pick this one up as its becoming a blocker for my use case.

          I am leaning towards Chris Riccomini proposed solution #1 as well. I have attached the design doc. I quickly discussed this with Yi Pan (Data Infrastructure).

          Show
          mdaxini Monal Daxini added a comment - I would like to pick this one up as its becoming a blocker for my use case. I am leaning towards Chris Riccomini proposed solution #1 as well. I have attached the design doc. I quickly discussed this with Yi Pan (Data Infrastructure) .
          Hide
          closeuris Yan Fang added a comment -

          hi Monal Daxini, thank you for picking this up and attaching the design doc.

          Just a little confused by what you are trying to achieve.
          1. Is this change only for the LocalJobFactory or for all the Samza deployment?
          2. Are you trying to filter out some stream partitions? such as you want to have something like kafka.foo[1-4], which only consumes foo's partition 1 - 4. If this is true, your proposal looks good. + 1 for that.
          3. Are you trying to assign specific partition to a specific container? The proposal does not look like this way. Feel confused because of the topic of this JIRA.

          Show
          closeuris Yan Fang added a comment - hi Monal Daxini , thank you for picking this up and attaching the design doc. Just a little confused by what you are trying to achieve. 1. Is this change only for the LocalJobFactory or for all the Samza deployment? 2. Are you trying to filter out some stream partitions? such as you want to have something like kafka.foo [1-4] , which only consumes foo's partition 1 - 4. If this is true, your proposal looks good. + 1 for that. 3. Are you trying to assign specific partition to a specific container? The proposal does not look like this way. Feel confused because of the topic of this JIRA.
          Hide
          closeuris Yan Fang added a comment -

          If it's the (2), seems not part of this JIRA. We can open a new JIRA for this.

          Show
          closeuris Yan Fang added a comment - If it's the (2), seems not part of this JIRA. We can open a new JIRA for this.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Yan Fang, the idea in Monal Daxini's proposal does not necessarily mean filter out system stream partitions. It can be a regex matcher to specify the list of partitions that need to be assigned to the local container. Hence, I think that it can be one way of specifying the static assignment via configuration.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Yan Fang , the idea in Monal Daxini 's proposal does not necessarily mean filter out system stream partitions. It can be a regex matcher to specify the list of partitions that need to be assigned to the local container. Hence, I think that it can be one way of specifying the static assignment via configuration.
          Hide
          closeuris Yan Fang added a comment -

          I see. I said "filter out" because I thought with the proposed API, we would already get a subset of task.inputs. It may do more than that.

          "specify the list of partitions that need to be assigned to the local container." this is a little misled for me. If we are only talking about the LocalJobFactory, this statement is true. But if we are also talking about other deployment, such as YARN, Mesos, this just gives us the set of SSPs, does not do any assignment. The "grouper" does that job. Since the change will happen in the JobCoordinator, I am assuming it should be fit for all deployments.

          Show
          closeuris Yan Fang added a comment - I see. I said "filter out" because I thought with the proposed API, we would already get a subset of task.inputs. It may do more than that. "specify the list of partitions that need to be assigned to the local container." this is a little misled for me. If we are only talking about the LocalJobFactory, this statement is true. But if we are also talking about other deployment, such as YARN, Mesos, this just gives us the set of SSPs, does not do any assignment. The "grouper" does that job. Since the change will happen in the JobCoordinator, I am assuming it should be fit for all deployments.
          Hide
          mdaxini Monal Daxini added a comment -

          The proposal is to only enable the parition assignment using the regex if ProcessJobFactory or ThreadJobFactory is specified. This would be in line with the scope of this ticket to support static partition assignment for LocalJobFactory.

          The part that the proposal leaves for later is the ability to run more than 1 container for ProcessJobFactory and ThreadJobFactory as monitoring and managing additional containers is more involved. Besides multiple container management is contemplated to be added to the JobCoordinator in the near future.

          Show
          mdaxini Monal Daxini added a comment - The proposal is to only enable the parition assignment using the regex if ProcessJobFactory or ThreadJobFactory is specified. This would be in line with the scope of this ticket to support static partition assignment for LocalJobFactory. The part that the proposal leaves for later is the ability to run more than 1 container for ProcessJobFactory and ThreadJobFactory as monitoring and managing additional containers is more involved. Besides multiple container management is contemplated to be added to the JobCoordinator in the near future.
          Hide
          closeuris Yan Fang added a comment -

          OK. That makes sense. Thanks for explaining. Then I think the proposed solution looks good for me.

          Show
          closeuris Yan Fang added a comment - OK. That makes sense. Thanks for explaining. Then I think the proposed solution looks good for me.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Monal Daxini, the proposal looks good to me overall. I just have the following minor comments:

          1. Can we name it to SystemStreamPartitionMatcher? Per Yan Fang's comments, filter can easily led to the interpretation that this class is to filter out partitions. But it is more of matching the partitions to the local job. I would prefer to change job.systemstreampartition.filterClass to job.systemstreampartition.matcher.class to follow our convention in Samza config as well.
          2. I would suggest to change the configuration of the matcher to be: job.systemstreampartition.matcher.config.*, also following the convention that we have here

          Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Monal Daxini , the proposal looks good to me overall. I just have the following minor comments: Can we name it to SystemStreamPartitionMatcher? Per Yan Fang 's comments, filter can easily led to the interpretation that this class is to filter out partitions. But it is more of matching the partitions to the local job. I would prefer to change job.systemstreampartition.filterClass to job.systemstreampartition.matcher.class to follow our convention in Samza config as well. I would suggest to change the configuration of the matcher to be: job.systemstreampartition.matcher.config.*, also following the convention that we have here Thanks!
          Hide
          mdaxini Monal Daxini added a comment -

          Yi Pan (Data Infrastructure), the suggestions look good to me. I have updated the attached design docs with the revised proposal and added additional clarification.

          Show
          mdaxini Monal Daxini added a comment - Yi Pan (Data Infrastructure) , the suggestions look good to me. I have updated the attached design docs with the revised proposal and added additional clarification.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          +1 for the proposal. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited +1 for the proposal. Thanks!
          Hide
          mdaxini Monal Daxini added a comment -

          Patch is ready.

          I have updated the design docs.

          As per my conversation with Yi Pan, I have attached the patch for SAMZA-41 built and tested against 0.9.1 branch, along with updated docs and unit tests.

          The changes are pretty non-intrusive, and I am hoping this makes it into the 0.9.1 release.

          Here is the review board link (Yi Pan (Data Infrastructure) I have added you as a reviewer. Please feel free to add others):
          https://reviews.apache.org/r/35809/

          Link to the pull request:
          https://github.com/apache/samza/pull/4

          I can send a separate pull request for master later on.

          Show
          mdaxini Monal Daxini added a comment - Patch is ready. I have updated the design docs. As per my conversation with Yi Pan, I have attached the patch for SAMZA-41 built and tested against 0.9.1 branch, along with updated docs and unit tests. The changes are pretty non-intrusive, and I am hoping this makes it into the 0.9.1 release. Here is the review board link ( Yi Pan (Data Infrastructure) I have added you as a reviewer. Please feel free to add others): https://reviews.apache.org/r/35809/ Link to the pull request: https://github.com/apache/samza/pull/4 I can send a separate pull request for master later on.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Monal Daxini, sorry I mistyped in our earlier email conversation. 0.9.1 is a bug-fix version and we are not planning to add new features in the release. I would recommend to move this patch to the master only.

          Again, my mistake! Sorry for the confusion.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Monal Daxini , sorry I mistyped in our earlier email conversation. 0.9.1 is a bug-fix version and we are not planning to add new features in the release. I would recommend to move this patch to the master only. Again, my mistake! Sorry for the confusion.
          Hide
          closeuris Yan Fang added a comment -

          Monal Daxini, sorry for just seeing your pull request in the github. Could you upload the patch in this JIRA? We do not use the git pull request for the contribution.

          Show
          closeuris Yan Fang added a comment - Monal Daxini , sorry for just seeing your pull request in the github. Could you upload the patch in this JIRA? We do not use the git pull request for the contribution.
          Hide
          mdaxini Monal Daxini added a comment - - edited

          Yan Fang, I had already uploaded the patch here. However it was for version 0.9.x.
          I will upload the updated patch once I have had a chance to get it working against 0.10. The current patch only works with 0.9.x.

          Show
          mdaxini Monal Daxini added a comment - - edited Yan Fang , I had already uploaded the patch here. However it was for version 0.9.x. I will upload the updated patch once I have had a chance to get it working against 0.10. The current patch only works with 0.9.x.
          Hide
          closeuris Yan Fang added a comment -

          Thanks a lot, Monal Daxini !

          Show
          closeuris Yan Fang added a comment - Thanks a lot, Monal Daxini !
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Hi, Monal Daxini, do you have an updated patch for 0.10 now? We are trying to identify the JIRAs to be included in 0.10 and it would be good to include your patch if it is ready for 0.10.

          Thanks!

          -Yi

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Hi, Monal Daxini , do you have an updated patch for 0.10 now? We are trying to identify the JIRAs to be included in 0.10 and it would be good to include your patch if it is ready for 0.10. Thanks! -Yi
          Hide
          mdaxini Monal Daxini added a comment - - edited

          Not yet. However, I will try next week if I can get that in.

          Show
          mdaxini Monal Daxini added a comment - - edited Not yet. However, I will try next week if I can get that in.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Monal Daxini, did you get a chance to port this patch to 0.10? We try to close on 0.10 soon. Thanks a lot in advance!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Monal Daxini , did you get a chance to port this patch to 0.10? We try to close on 0.10 soon. Thanks a lot in advance!
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Monal Daxini, any update? Thanks! I am marking this as 0.10 since it should be useful. I can port it over to 0.10 if you can post the patch for 0.9.1 here.

          Thanks a lot!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Monal Daxini , any update? Thanks! I am marking this as 0.10 since it should be useful. I can port it over to 0.10 if you can post the patch for 0.9.1 here. Thanks a lot!
          Hide
          mdaxini Monal Daxini added a comment -

          Hi Yi,
          Yi Pan (Data Infrastructure) I had uploaded the patch in late June, it hasn't changed since then. It works and we have been using it in production for a while.

          I don't think I will get to it this week to port it over to 0.10. I can start taking a look at it on Monday.

          What's the timeline for 0.10 release?

          THanks

          Show
          mdaxini Monal Daxini added a comment - Hi Yi, Yi Pan (Data Infrastructure) I had uploaded the patch in late June, it hasn't changed since then. It works and we have been using it in production for a while. I don't think I will get to it this week to port it over to 0.10. I can start taking a look at it on Monday. What's the timeline for 0.10 release? THanks
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited

          Hi, Monal Daxini, did you mean that your RB: https://reviews.apache.org/r/35809/ ? Apache requires uploading the diff file as attachment to the JIRA, for legal reasons. :/

          Anyways, let me take the patch and port it over to 0.10 and upload. We are releasing 0.10 now. Hence, trying to get the already working patches in is my priority

          Thanks again!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - - edited Hi, Monal Daxini , did you mean that your RB: https://reviews.apache.org/r/35809/ ? Apache requires uploading the diff file as attachment to the JIRA, for legal reasons. :/ Anyways, let me take the patch and port it over to 0.10 and upload. We are releasing 0.10 now. Hence, trying to get the already working patches in is my priority Thanks again!
          Hide
          mdaxini Monal Daxini added a comment -

          Sorry, I somehow missed uploading the diff patch files. I have attached the two patches one for regex and other for range based specification of partitions for static assignment.

          The order in which I patched
          1. regex patch, then
          2. range patch

          Thanks
          Monal

          Show
          mdaxini Monal Daxini added a comment - Sorry, I somehow missed uploading the diff patch files. I have attached the two patches one for regex and other for range based specification of partitions for static assignment. The order in which I patched 1. regex patch, then 2. range patch Thanks Monal
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Porting the change over from 0.9.1 to 0.10 turns out to be a bit trickier due to the JobCoordinator changes to work with coordinator stream. Also, in the static assignment case, what the defined behavior on the coordinator stream should be among all ProcessJobs processing the same input topics need some testing and validation as well. Hence, push this one later.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Porting the change over from 0.9.1 to 0.10 turns out to be a bit trickier due to the JobCoordinator changes to work with coordinator stream. Also, in the static assignment case, what the defined behavior on the coordinator stream should be among all ProcessJobs processing the same input topics need some testing and validation as well. Hence, push this one later.
          Hide
          jagadish1989@gmail.com Jagadish added a comment -

          Yi PanMonal Daxini Is this patch similar to a filter - that ensures that a job processes only a subset of the SSPs? Can this be solved by a new FilteringSSPGrouper?
          In addition, I believe we also have a container grouper interface.

          Maybe, my understanding of this patch is terrible. So, please correct me if I'm wrong.

          Show
          jagadish1989@gmail.com Jagadish added a comment - Yi Pan Monal Daxini Is this patch similar to a filter - that ensures that a job processes only a subset of the SSPs? Can this be solved by a new FilteringSSPGrouper? In addition, I believe we also have a container grouper interface. Maybe, my understanding of this patch is terrible. So, please correct me if I'm wrong.
          Hide
          jagadish1989@gmail.com Jagadish added a comment -

          Discussed in-person with Yi, The grouper interface will just distribute the incoming partitions into a set of tasks. Having a filter (as this patch does) makes things cleaner.

          Show
          jagadish1989@gmail.com Jagadish added a comment - Discussed in-person with Yi, The grouper interface will just distribute the incoming partitions into a set of tasks. Having a filter (as this patch does) makes things cleaner.
          Hide
          josebarrueta Jose added a comment -

          Attach updated patch to target v0.10.1

          Show
          josebarrueta Jose added a comment - Attach updated patch to target v0.10.1
          Hide
          navina Navina Ramesh added a comment -

          Jose Can you please create an RB for this? Thanks!

          Show
          navina Navina Ramesh added a comment - Jose Can you please create an RB for this? Thanks!
          Hide
          josebarrueta Jose added a comment -

          Sorry Navina Ramesh, forgot to paste it last time, this is it: https://reviews.apache.org/r/44422/

          Show
          josebarrueta Jose added a comment - Sorry Navina Ramesh , forgot to paste it last time, this is it: https://reviews.apache.org/r/44422/
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Jose, I left a high-level comment on the RB. Thanks a lot and sorry for the delay.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Jose , I left a high-level comment on the RB. Thanks a lot and sorry for the delay.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Rebased against head of the trunk. Tested locally.

          Merged and submitted. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Rebased against head of the trunk. Tested locally. Merged and submitted. Thanks!
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Merged and submitted. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Merged and submitted. Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mdaxini closed the pull request at:

          https://github.com/apache/samza/pull/4

          Show
          githubbot ASF GitHub Bot added a comment - Github user mdaxini closed the pull request at: https://github.com/apache/samza/pull/4

            People

            • Assignee:
              josebarrueta Jose
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              4 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development