Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-71

Support new partitioning strategies

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.6.0
    • 0.8.0
    • container


      Currently, the number of stream tasks instances that are created for a Samza job are equal to the max number of partitions across all input streams. For example, if your Samza job is using two YARN containers, and has two input streams (IS1 and IS2), and IS1 has 4 partitions, and IS2 has 8 partitions, then the Samza job will have a total of max(4,8)=8 partitions. Therefore, 8 StreamTask instances would be created (spread as evenly as possible across two YARN containers).

      This scheme works for co-grouping when both input streams have the same number of partitions, and are partitioned using the same partitioning scheme (e.g. a Kafka partitioner that partitions by member ID). The parallelism of the job is limited by the number of StreamTask instances, which means that the parallelism is limited by the max number of partitions across all input streams (8 in the example above).

      We can actually do better than these guarantees. We should change Samza's partitioning style to behave in the following way:

      1. Support a task.partition.scheme=max config. Samza will create one stream task instance for each input stream partition. In the example above, IS1 has 4 partitions, and IS2 has 8 partitions, so Samza would create 4+8=12 StreamTasks. Each input stream's partition would be mapped to a unique StreamTask that would receive messages ONLY from that input stream/partition pair, and from no other. Using this style of partitioning increases a Samza job's possible parallelism to be the absolute maximum (based on Kafka semantics, which limit a single consumer for each input stream/partition pair).

      2. Support a task.partition.scheme=cogroup config. Samza will create one stream task instance for the greatest common denominator of all stream task partition counts. For example, in the example above, IS1 has 4 partitions, and IS2 has 8. GCD(4,8)=4, so Samza would create four partitions. If IS1 had 4 partitions, and IS2 had 6, then GCD(4,6)=2, so the Samza job would have two StreamTask instances. Using this style can decrease a Samza job's parallelism, but provides the guarantee that a StreamTask instance will receive all messages across all input streams for a key that it's in charge of. For example, if a StreamTask is consuming AdViews and AdClicks, and both are partitioned by member ID, but AdViews has 12 partitions, and AdClicks has 8 partitions, then there will be 4 StreamTask instances, and each instance will receive rougly 1/4th of all clicks and views, and all clicks and views for a given member ID will be mapped to just one of the StreamTask, so aggregation/joining will be possible.

      The default task.partition.scheme will be max, when the user hasn't specified a partition scheme. Thus, the default will not allow any aggregation or joining across input streams.

      With both of these styles, we can still use the Partition class (and getPartitionId) to identify each StreamTask instance, but we will need to devise a deterministic way to map from each input stream/partition pair to each StreamTask partition.

      In the case of style #1 (max), consider the case where we have IS1 with 4 partitions and IS2 with 8 partitions. We can use the order of task.inputs to define an ordering across stream names. We can then instantiate all 12 StreamTasks, and simply iterate over all input stream's based on their task.inputs order and sorted partition sets to do the mapping. If we had task.inputs=IS2,IS1, the mapping would look like this:

      IS2:0 - StreamTask:0
      IS2:1 - StreamTask:1
      IS2:2 - StreamTask:2
      IS2:3 - StreamTask:3
      IS2:4 - StreamTask:4
      IS2:5 - StreamTask:5
      IS2:6 - StreamTask:6
      IS2:7 - StreamTask:7
      IS1:0 - StreamTask:8
      IS1:1 - StreamTask:9
      IS1:2 - StreamTask:10
      IS1:3 - StreamTask:11

      In the case of style #2 (cogroup), consider the case where IS1 has 8 partitions and IS2 has 12 partitions. GCD(8,12)=4, so 4 StreamTasks would be created. The mapping in this case should then be:

      IS1:0 - StreamTask:0
      IS1:1 - StreamTask:1
      IS1:2 - StreamTask:2
      IS1:3 - StreamTask:3
      IS1:4 - StreamTask:0
      IS1:5 - StreamTask:1
      IS1:6 - StreamTask:2
      IS1:7 - StreamTask:3
      IS2:0 - StreamTask:0
      IS2:1 - StreamTask:1
      IS2:2 - StreamTask:2
      IS2:3 - StreamTask:3
      IS2:4 - StreamTask:0
      IS2:5 - StreamTask:1
      IS2:6 - StreamTask:2
      IS2:7 - StreamTask:3
      IS2:8 - StreamTask:0
      IS2:9 - StreamTask:1
      IS2:10 - StreamTask:2
      IS2:11 - StreamTask:3

      As you can see, the assignment is done by modding each input stream's partition number by the GCD value (4, in this case). This assignment strategy has the nice guarantee that keys will map to the same StreamTask across input streams with different partition counts (provided that they're partitioned by the same key). For example, member ID 1213 % 8 = partition 5 in IS1, and 1213 %12 = partition 1 in IS2. If you then mod by the GCD (4), you get 5%4=1 and 1%4=1. The same holds true for other keys, as well.

      1211%8=3 .. 3%4=3
      1211%12=11 .. 11%4=3

      Both of these partition assignment schemes work only as long as the guarantee that the task.inputs stream order is static (or new streams are appended to the end), and that each input stream's partition count is static, and will never change.

      You can use the Euclidean algorithm to find the GCD:



        Issue Links


          This comment will be Viewable by All Users Viewable by All Users


            jghoman Jakob Homan
            criccomini Chris Riccomini
            0 Vote for this issue
            5 Start watching this issue




                Issue deployment