Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-71

Support new partitioning strategies

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.0
    • Fix Version/s: 0.8.0
    • Component/s: container
    • Labels:

      Description

      Currently, the number of stream tasks instances that are created for a Samza job are equal to the max number of partitions across all input streams. For example, if your Samza job is using two YARN containers, and has two input streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions, then the Samza job will have a total of max(4,8)=8 partitions. Therefore, 8 StreamTask instances would be created (spread as evenly as possible across two YARN containers).

      This scheme works for co-grouping when both input streams have the same number of partitions, and are partitioned using the same partitioning scheme (e.g. a Kafka partitioner that partitions by member ID). The parallelism of the job is limited by the number of StreamTask instances, which means that the parallelism is limited by the max number of partitions across all input streams (8 in the example above).

      We can actually do better than these guarantees. We should change Samza's partitioning style to behave in the following way:

      1. Support a task.partition.scheme=max config. Samza will create one stream task instance for each input stream partition. In the example above, IS1 has 4 partitions, and IS2 has 8 partitions, so Samza would create 4+8=12 StreamTasks. Each input stream's partition would be mapped to a unique StreamTask that would receive messages ONLY from that input stream/partition pair, and from no other. Using this style of partitioning increases a Samza job's possible parallelism to be the absolute maximum (based on Kafka semantics, which limit a single consumer for each input stream/partition pair).

      2. Support a task.partition.scheme=cogroup config. Samza will create one stream task instance for the greatest common denominator of all stream task partition counts. For example, in the example above, IS1 has 4 partitions, and IS2 has 8. GCD(4,8)=4, so Samza would create four partitions. If IS1 had 4 partitions, and IS2 had 6, then GCD(4,6)=2, so the Samza job would have two StreamTask instances. Using this style can decrease a Samza job's parallelism, but provides the guarantee that a StreamTask instance will receive all messages across all input streams for a key that it's in charge of. For example, if a StreamTask is consuming AdViews and AdClicks, and both are partitioned by member ID, but AdViews has 12 partitions, and AdClicks has 8 partitions, then there will be 4 StreamTask instances, and each instance will receive rougly 1/4th of all clicks and views, and all clicks and views for a given member ID will be mapped to just one of the StreamTask, so aggregation/joining will be possible.

      The default task.partition.scheme will be max, when the user hasn't specified a partition scheme. Thus, the default will not allow any aggregation or joining across input streams.

      With both of these styles, we can still use the Partition class (and getPartitionId) to identify each StreamTask instance, but we will need to devise a deterministic way to map from each input stream/partition pair to each StreamTask partition.

      In the case of style #1 (max), consider the case where we have IS1 with 4 partitions and IS2 with 8 partitions. We can use the order of task.inputs to define an ordering across stream names. We can then instantiate all 12 StreamTasks, and simply iterate over all input stream's based on their task.inputs order and sorted partition sets to do the mapping. If we had task.inputs=IS2,IS1, the mapping would look like this:

      IS2:0 - StreamTask:0
      IS2:1 - StreamTask:1
      IS2:2 - StreamTask:2
      IS2:3 - StreamTask:3
      IS2:4 - StreamTask:4
      IS2:5 - StreamTask:5
      IS2:6 - StreamTask:6
      IS2:7 - StreamTask:7
      IS1:0 - StreamTask:8
      IS1:1 - StreamTask:9
      IS1:2 - StreamTask:10
      IS1:3 - StreamTask:11

      In the case of style #2 (cogroup), consider the case where IS1 has 8 partitions and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be created. The mapping in this case should then be:

      IS1:0 - StreamTask:0
      IS1:1 - StreamTask:1
      IS1:2 - StreamTask:2
      IS1:3 - StreamTask:3
      IS1:4 - StreamTask:0
      IS1:5 - StreamTask:1
      IS1:6 - StreamTask:2
      IS1:7 - StreamTask:3
      IS2:0 - StreamTask:0
      IS2:1 - StreamTask:1
      IS2:2 - StreamTask:2
      IS2:3 - StreamTask:3
      IS2:4 - StreamTask:0
      IS2:5 - StreamTask:1
      IS2:6 - StreamTask:2
      IS2:7 - StreamTask:3
      IS2:8 - StreamTask:0
      IS2:9 - StreamTask:1
      IS2:10 - StreamTask:2
      IS2:11 - StreamTask:3

      As you can see, the assignment is done by modding each input stream's partition number by the GCD value (4, in this case). This assignment strategy has the nice guarantee that keys will map to the same StreamTask across input streams with different partition counts (provided that they're partitioned by the same key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213 %12 = partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and 1%4=1. The same holds true for other keys, as well.

      1211%8=3 .. 3%4=3
      1211%12=11 .. 11%4=3

      Both of these partition assignment schemes work only as long as the guarantee that the task.inputs stream order is static (or new streams are appended to the end), and that each input stream's partition count is static, and will never change.

      You can use the Euclidean algorithm to find the GCD:

      http://www-math.ucdenver.edu/~wcherowi/courses/m5410/exeucalg.html

        Issue Links

          Activity

          Hide
          criccomini Chris Riccomini added a comment -

          Thinking on this more, I think we should default to support co-partitioning, not max.

          Show
          criccomini Chris Riccomini added a comment - Thinking on this more, I think we should default to support co-partitioning, not max.
          Hide
          criccomini Chris Riccomini added a comment -

          This ticket should be broken into subtasks.

          Show
          criccomini Chris Riccomini added a comment - This ticket should be broken into subtasks.
          Hide
          criccomini Chris Riccomini added a comment - - edited

          Several things to think about here:

          1. The existing proposal does not include a setting that mimics the current partitioning behavior.
          2. We are potentially increasing the number of partitions of a Samza job quite dramatically when max is set.
          3. We are creating a configuration that must remain static after the first time a job is run.

          Right now, partitioning (1) is done by getting the largest topic (by partition count) and using that topic's partition count as the partition count for the Samza job. For example, if the largest input topic has a partition count of 48, then a Samza job that consumes this topic will have 48 partitions. Adding more input topics does not increase the partition count, provided that the new input topics have a partition count <= the max partition count.

          With the `co-group` strategy, adding a new input topic could potentially SHRINK the parallelism, in cases where the new GCD of all the topic partitions is actually less than it was before. Worst case, where no GCD can be found, the partitioning would shrink to 1. With the new `max` strategy, adding any new topic to the input stream would result in an increase in the partition count (2) on the job. This is a new behavior, and could impact Samza jobs in the following way:

          a. The checkpoint topic would no longer be valid, since it would not have partitions for the newly added partitions. For example, if a Samza job were set to max, and had 48 partitions, and a new 16 partition input topic were added, the checkpoint topic would have only 48 partitions, but the new Samza job would be running with 48 + 16 = 64 partitions.
          b. In cases where there are a large number of input topics, even if they individually only have a few partitions each, we could end up with 1000s of partitions in total. This is problematic for our checkpointing and state management, as we have per-partition topics for each of these stream types. Consider the case where a Samza job has a single state store, and 1000 input streams each with 4 partitions. In the existing model, this would result in 4 partitions for the Samza job (max(input stream partitions) = 4), which in turn results in 4 partitions for the checkpoint topic and 4 partitions for the state topic. If the new `max` configuration is set, instead, then the Samza job will have 1000 * 4 = 4000 partitions. This results in a 4000 partition checkpoint topic, and a 4000 partition state topic. Partitions in Kafka are not cheap (ZK overhead, inodes on disk, etc). This partition count is too large to support.

          Lastly, adjusting the input topics or partitioning strategy after a job has already been (3) run can result in unexpected behavior. Adding a topic, as already described above, might accidentally increase the partition count of the Samza job, which Samza assumes will never happen. Changing the partitioning strategy from max to co-group (or vice-versa) will result in state topics and checkpoint topics getting incorrectly assigned to new partitions.

          Show
          criccomini Chris Riccomini added a comment - - edited Several things to think about here: 1. The existing proposal does not include a setting that mimics the current partitioning behavior. 2. We are potentially increasing the number of partitions of a Samza job quite dramatically when max is set. 3. We are creating a configuration that must remain static after the first time a job is run. Right now, partitioning (1) is done by getting the largest topic (by partition count) and using that topic's partition count as the partition count for the Samza job. For example, if the largest input topic has a partition count of 48, then a Samza job that consumes this topic will have 48 partitions. Adding more input topics does not increase the partition count, provided that the new input topics have a partition count <= the max partition count. With the `co-group` strategy, adding a new input topic could potentially SHRINK the parallelism, in cases where the new GCD of all the topic partitions is actually less than it was before. Worst case, where no GCD can be found, the partitioning would shrink to 1. With the new `max` strategy, adding any new topic to the input stream would result in an increase in the partition count (2) on the job. This is a new behavior, and could impact Samza jobs in the following way: a. The checkpoint topic would no longer be valid, since it would not have partitions for the newly added partitions. For example, if a Samza job were set to max, and had 48 partitions, and a new 16 partition input topic were added, the checkpoint topic would have only 48 partitions, but the new Samza job would be running with 48 + 16 = 64 partitions. b. In cases where there are a large number of input topics, even if they individually only have a few partitions each, we could end up with 1000s of partitions in total. This is problematic for our checkpointing and state management, as we have per-partition topics for each of these stream types. Consider the case where a Samza job has a single state store, and 1000 input streams each with 4 partitions. In the existing model, this would result in 4 partitions for the Samza job (max(input stream partitions) = 4), which in turn results in 4 partitions for the checkpoint topic and 4 partitions for the state topic. If the new `max` configuration is set, instead, then the Samza job will have 1000 * 4 = 4000 partitions. This results in a 4000 partition checkpoint topic, and a 4000 partition state topic. Partitions in Kafka are not cheap (ZK overhead, inodes on disk, etc). This partition count is too large to support. Lastly, adjusting the input topics or partitioning strategy after a job has already been (3) run can result in unexpected behavior. Adding a topic, as already described above, might accidentally increase the partition count of the Samza job, which Samza assumes will never happen. Changing the partitioning strategy from max to co-group (or vice-versa) will result in state topics and checkpoint topics getting incorrectly assigned to new partitions.
          Hide
          jghoman Jakob Homan added a comment -

          Sidestepping the above discussion regarding co-grouping, it would be reasonably easy to support two more types of partitioning strategies that could be immediately useful: one topic-partition (TP) per task instance (TI) and one TI per Samza container.

          Because of what's described above, we're severely limited in our ability to scale jobs in ways that are not functions of the underlying streams' partition count. SAMZA-82 changed the above description of how partitioning is done. Now, the AM determines the topics and partitions that exist across the job and explicitly pass to the containers which TPs they are responsible for. The containers then group those TPs by their partition and create a new TI for each. By adding another directive from the AM to the container of how to group the TIs, we can handle these new strategies.

          Much of the complexity above is dictated by the current format of the checkpoint log, which writes all of the TP offsets that a particular TI is responsible for as one map. I'd like to change this so that each TP and offset is written to the checkpoint log by itself. On startup, all of the containers will read through the entire log, discarding the checkpoints for TPs it is not handling. This adds a bit of cost to the job startup, but the checkpoint log is not large (and its total size in bytes is not changing, just the number of entries in it). Also, we can continue to partition the log by some reasonable number (initial partition count?) so that in the standard grouped-by-partition scheme, containers only have to scan the partitions for which they are responsible. Finally, the key-dedupe feature we rely on for state will assist us here as well, pruning out old offsets. The concern is that since the offsets are no longer stored as a group individual offsets may age out separate from their fellows, but this problem is no larger than the current one where whole groups could disappear.

          Three partitioning strategies that appear immediately useful:

          • GROUP_BY_PARTITION - Our current strategy. Each TI is defined by its partition and all TPs with that partition are funneled through it.
          • ONE_TI_PER_TP - Each TP gets its own TI. The highest level of fanout possible and the maximum amount of granularity we can have with a Samza job. The AM can then round robin the assignment of the TPs to the containers, getting an even distribution of TPs across the job. This would be useful for consuming lots and lots of TPs that do not need to coordinate or group in any way.
          • ONE_TI_FOR_ALL_TP - One TI for all the TPs (on a container). By then setting the number of containers across the job to 1, all the messages would then be funneled through the single container. This would be useful for jobs that want an all-encompassing view of the data flow, even if its partitioned (otherwise we'd need to go through a repartitioning step and incur the latency and data storage cost).

          (Additionally, it would probably be useful to provide a pluggable interface for matching TIs to TPs. For example, some may wish to group by topic name prefix (ie, an org uses "DATA-CENTER-x-TOPIC-NAME-FOO" as its Kafka topic naming convention and wants to group by data center via a topic name prefix). This can be done at a later time.)

          Once these strategy concepts exist, the AM can pass the strategy to the container as a parameter. The container then can instantiate the correct number of TI, assiging the correct TPs as it goes.

          The immediate tasks are to refactor the checkpoint log to be independent of partitioning, and to introduce the partitioning strategy to the AM and containers.

          How does this sound?

          Show
          jghoman Jakob Homan added a comment - Sidestepping the above discussion regarding co-grouping, it would be reasonably easy to support two more types of partitioning strategies that could be immediately useful: one topic-partition (TP) per task instance (TI) and one TI per Samza container. Because of what's described above, we're severely limited in our ability to scale jobs in ways that are not functions of the underlying streams' partition count. SAMZA-82 changed the above description of how partitioning is done. Now, the AM determines the topics and partitions that exist across the job and explicitly pass to the containers which TPs they are responsible for. The containers then group those TPs by their partition and create a new TI for each. By adding another directive from the AM to the container of how to group the TIs, we can handle these new strategies. Much of the complexity above is dictated by the current format of the checkpoint log, which writes all of the TP offsets that a particular TI is responsible for as one map. I'd like to change this so that each TP and offset is written to the checkpoint log by itself. On startup, all of the containers will read through the entire log, discarding the checkpoints for TPs it is not handling. This adds a bit of cost to the job startup, but the checkpoint log is not large (and its total size in bytes is not changing, just the number of entries in it). Also, we can continue to partition the log by some reasonable number (initial partition count?) so that in the standard grouped-by-partition scheme, containers only have to scan the partitions for which they are responsible. Finally, the key-dedupe feature we rely on for state will assist us here as well, pruning out old offsets. The concern is that since the offsets are no longer stored as a group individual offsets may age out separate from their fellows, but this problem is no larger than the current one where whole groups could disappear. Three partitioning strategies that appear immediately useful: GROUP_BY_PARTITION - Our current strategy. Each TI is defined by its partition and all TPs with that partition are funneled through it. ONE_TI_PER_TP - Each TP gets its own TI. The highest level of fanout possible and the maximum amount of granularity we can have with a Samza job. The AM can then round robin the assignment of the TPs to the containers, getting an even distribution of TPs across the job. This would be useful for consuming lots and lots of TPs that do not need to coordinate or group in any way. ONE_TI_FOR_ALL_TP - One TI for all the TPs (on a container). By then setting the number of containers across the job to 1, all the messages would then be funneled through the single container. This would be useful for jobs that want an all-encompassing view of the data flow, even if its partitioned (otherwise we'd need to go through a repartitioning step and incur the latency and data storage cost). (Additionally, it would probably be useful to provide a pluggable interface for matching TIs to TPs. For example, some may wish to group by topic name prefix (ie, an org uses "DATA-CENTER-x-TOPIC-NAME-FOO" as its Kafka topic naming convention and wants to group by data center via a topic name prefix). This can be done at a later time.) Once these strategy concepts exist, the AM can pass the strategy to the container as a parameter. The container then can instantiate the correct number of TI, assiging the correct TPs as it goes. The immediate tasks are to refactor the checkpoint log to be independent of partitioning, and to introduce the partitioning strategy to the AM and containers. How does this sound?
          Hide
          criccomini Chris Riccomini added a comment -

          Sounds pretty good. A few comments:

          1. Don't forget that the LocalJobFactory (ThreadJob/ProcessJob) will probably need to be updated as well.
          2. Can you do the new KafkaCheckpointManager as a separate class (keep the old one)? This will make it easier for us to migrate existing jobs. Once committed, we can mark the old one as deprecated, and open a new JIRA to remove it.
          3. I think TaskContext.getPartition will need to be changed/updated.
          4. We'd have to think about the pluggability part of this. The problem with the DATA-CENTER example is that, not all partitions for a given data center are guaranteed to be assigned to a single container. Essentially, there's two levels of assignment going on: partition -> container, and partition -> TI. We only control the partition -> TI mapping with this setting, and if the partition -> container mapping is incorrect, you're going to get weird (wrong) results.
          5. Do you want to sub-task this to keep the checkpoint refactor and the partition strategy changes?

          Show
          criccomini Chris Riccomini added a comment - Sounds pretty good. A few comments: 1. Don't forget that the LocalJobFactory (ThreadJob/ProcessJob) will probably need to be updated as well. 2. Can you do the new KafkaCheckpointManager as a separate class (keep the old one)? This will make it easier for us to migrate existing jobs. Once committed, we can mark the old one as deprecated, and open a new JIRA to remove it. 3. I think TaskContext.getPartition will need to be changed/updated. 4. We'd have to think about the pluggability part of this. The problem with the DATA-CENTER example is that, not all partitions for a given data center are guaranteed to be assigned to a single container. Essentially, there's two levels of assignment going on: partition -> container, and partition -> TI. We only control the partition -> TI mapping with this setting, and if the partition -> container mapping is incorrect, you're going to get weird (wrong) results. 5. Do you want to sub-task this to keep the checkpoint refactor and the partition strategy changes?
          Hide
          jghoman Jakob Homan added a comment -

          2. Can you do the new KafkaCheckpointManager as a separate class (keep the old one)? This will make it easier for us to migrate existing jobs. Once committed, we can mark the old one as deprecated, and open a new JIRA to remove it.

          My preference would be to nuke the old class. Existing jobs can either rename themselves or manually delete the checkpoint log (it would be great if Kafka had a way to force log deletion, but according to Jun that's still a ways off KAFKA-330). Do you think there are enough Samza jobs in the wild for which this would be an insurmountable problem to justify the extra cost and code? If there are enough, it would be easy to write a quick one-off tool to generate a new-format log from the old-style one. Maybe the log should have a version number added to its name (in the absence of metadata capabilities about the Kafka log). My concern is that we're iterating on this code pretty quickly; it would be easy to start accumulating old-style readers and writers. Once we've got a release out, it'll be important to provide this type of backwards compatibility support.

          4. We'd have to think about the pluggability part of this. The problem with the DATA-CENTER example is that, not all partitions for a given data center are guaranteed to be assigned to a single container. Essentially, there's two levels of assignment going on: partition -> container, and partition -> TI. We only control the partition -> TI mapping with this setting, and if the partition -> container mapping is incorrect, you're going to get weird (wrong) results.

          In this example, the partition could be guaranteed. It would be up to the implementor. All the pluggable function would do would be to take set of TPs and group them into inputs for a particular TI. For example, assuming four topics (DC1-Pageview, DC2-Pageview, DC1-Click, DC2-Click), each partitioned two ways, the function could either generate two sets

          {DC1-Pageview/0, DC1-Click/0, DC1-Pageview/1, DC1-Click/1},  {DC2-Pageview/0, DC2-Click/0, DC2-Pageview/1, DC2-Click/1}

          or four

           {DC1-Pageview/0, DC1-Click/0}, {DC1-Pageview/1, DC1-Click/1}, {DC2-Pageview/0, DC2-Click/0}, {DC2-Pageview/1, DC2-Click/1} 

          to preserve the partition grouping (ie, GROUP BY substring(topic, 3) versus GROUP BY substring(topic, 3), partition). Either way, each of those sets would result in the creation of a new TI, to be run somewhere. The pluggable code would have access to all the TPs and could group as appropriate.

          5. Do you want to sub-task this to keep the checkpoint refactor and the partition strategy changes?

          Yep.

          Show
          jghoman Jakob Homan added a comment - 2. Can you do the new KafkaCheckpointManager as a separate class (keep the old one)? This will make it easier for us to migrate existing jobs. Once committed, we can mark the old one as deprecated, and open a new JIRA to remove it. My preference would be to nuke the old class. Existing jobs can either rename themselves or manually delete the checkpoint log (it would be great if Kafka had a way to force log deletion, but according to Jun that's still a ways off KAFKA-330 ). Do you think there are enough Samza jobs in the wild for which this would be an insurmountable problem to justify the extra cost and code? If there are enough, it would be easy to write a quick one-off tool to generate a new-format log from the old-style one. Maybe the log should have a version number added to its name (in the absence of metadata capabilities about the Kafka log). My concern is that we're iterating on this code pretty quickly; it would be easy to start accumulating old-style readers and writers. Once we've got a release out, it'll be important to provide this type of backwards compatibility support. 4. We'd have to think about the pluggability part of this. The problem with the DATA-CENTER example is that, not all partitions for a given data center are guaranteed to be assigned to a single container. Essentially, there's two levels of assignment going on: partition -> container, and partition -> TI. We only control the partition -> TI mapping with this setting, and if the partition -> container mapping is incorrect, you're going to get weird (wrong) results. In this example, the partition could be guaranteed. It would be up to the implementor. All the pluggable function would do would be to take set of TPs and group them into inputs for a particular TI. For example, assuming four topics (DC1-Pageview, DC2-Pageview, DC1-Click, DC2-Click), each partitioned two ways, the function could either generate two sets {DC1-Pageview/0, DC1-Click/0, DC1-Pageview/1, DC1-Click/1}, {DC2-Pageview/0, DC2-Click/0, DC2-Pageview/1, DC2-Click/1} or four {DC1-Pageview/0, DC1-Click/0}, {DC1-Pageview/1, DC1-Click/1}, {DC2-Pageview/0, DC2-Click/0}, {DC2-Pageview/1, DC2-Click/1} to preserve the partition grouping (ie, GROUP BY substring(topic, 3) versus GROUP BY substring(topic, 3), partition ). Either way, each of those sets would result in the creation of a new TI, to be run somewhere. The pluggable code would have access to all the TPs and could group as appropriate. 5. Do you want to sub-task this to keep the checkpoint refactor and the partition strategy changes? Yep.
          Hide
          criccomini Chris Riccomini added a comment -

          Cool, we can skip backwards compatibility for the checkpoint log. Just thinking this through.

          Regarding the pluggability, I take your point. I'm just saying it's a little confusing and complicated, but I agree it should work. Let's punt on it for now, anyway, if you're OK with that.

          Show
          criccomini Chris Riccomini added a comment - Cool, we can skip backwards compatibility for the checkpoint log. Just thinking this through. Regarding the pluggability, I take your point. I'm just saying it's a little confusing and complicated, but I agree it should work. Let's punt on it for now, anyway, if you're OK with that.
          Hide
          criccomini Chris Riccomini added a comment -

          As part of SAMZA-194, we agreed to switch the URI encoding in the container's partition assignment using JSON instead of our custom format.

          Show
          criccomini Chris Riccomini added a comment - As part of SAMZA-194 , we agreed to switch the URI encoding in the container's partition assignment using JSON instead of our custom format.
          Hide
          criccomini Chris Riccomini added a comment -

          Resolved as part of SAMZA-123. If we decide we want the fancy GCD style partitioning, we can open a separate ticket for that.

          Show
          criccomini Chris Riccomini added a comment - Resolved as part of SAMZA-123 . If we decide we want the fancy GCD style partitioning, we can open a separate ticket for that.

            People

            • Assignee:
              jghoman Jakob Homan
              Reporter:
              criccomini Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development