Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-882

Detect partition count changes in input streams

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.0
    • Fix Version/s: 0.10.1
    • Component/s: None
    • Labels:
      None

      Description

      This is a known issue where any change in the partition count in the upstream affects the Samza job and it needs to be restarted. In such scenarios, we experience data loss or incorrect processing because the application logic depends on the partitioning strategy. It is worsened by the fact that we don't even have a good mechanism to detect such a change.

      As a first-step towards detection, I propose that we modify the stream metadata cache maintained in Samza such that when there a change in partition count, we increment a gauge metric. This way we can at least attach a hook to monitor when this happens and take necessary actions.

      However, in the long-term, we need to come up with a better strategy for handling this.

      1. SAMZA-882-0.patch
        35 kB
        Navina Ramesh
      2. SAMZA-882-1.patch
        35 kB
        Navina Ramesh

        Issue Links

          Activity

          Hide
          jarradk Ken added a comment -

          JIRA issue SAMZA-882 is applicable to new partitions or re-partitioning. In this comment I would like to extend the conversation to include partitions that become dormant or obsolete. If there is a better venue for this conversation please direct me to it.

          In the Samza examples, the partition key is often user id. It is reasonable to expect users to enter and exit an organisation. Thus new partitions will be created and existing partitions will become dormant. Real world scenarios are 'employee leaves company' or 'customer closes account'.

          One StreamTask instance and one Thread instance are created by a Samza Container per partition (for simplicity, this scenario is a Samza Job with a single input stream). Thus all dormant partitions (of the input stream) will be polled for new messages (and yet there will be no new messages).

          What is the opinion of Samza architects regarding the resources that are allocated to dormant partitions? The resources are: processing via polling for new messages and switching threads; heap memory (probably not much). The load balancing via Yarn could be uneven (e.g. one node could be assigned a disproportionate number of dormant partitions).

          If I have misunderstood any concept please inform me (I am a Samza newbie).

          Show
          jarradk Ken added a comment - JIRA issue SAMZA-882 is applicable to new partitions or re-partitioning. In this comment I would like to extend the conversation to include partitions that become dormant or obsolete. If there is a better venue for this conversation please direct me to it. In the Samza examples, the partition key is often user id. It is reasonable to expect users to enter and exit an organisation. Thus new partitions will be created and existing partitions will become dormant. Real world scenarios are 'employee leaves company' or 'customer closes account'. One StreamTask instance and one Thread instance are created by a Samza Container per partition (for simplicity, this scenario is a Samza Job with a single input stream). Thus all dormant partitions (of the input stream) will be polled for new messages (and yet there will be no new messages). What is the opinion of Samza architects regarding the resources that are allocated to dormant partitions? The resources are: processing via polling for new messages and switching threads; heap memory (probably not much). The load balancing via Yarn could be uneven (e.g. one node could be assigned a disproportionate number of dormant partitions). If I have misunderstood any concept please inform me (I am a Samza newbie).
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Ken, thanks for your input. IMO, the "dormant partition" problem you described would be better handled by a correct choice of hash function in partitioning. In your example, when you use user id as the partition key, there will be M number of users who are mapped to a single partition, based on some hash function. Hence, the "dormant partition" can only happen if all M users "leave the company". Usually, the probability for all user ids mapped to a single partition id are invalid is really low, with a good choice of hash function.

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Ken , thanks for your input. IMO, the "dormant partition" problem you described would be better handled by a correct choice of hash function in partitioning. In your example, when you use user id as the partition key, there will be M number of users who are mapped to a single partition, based on some hash function. Hence, the "dormant partition" can only happen if all M users "leave the company". Usually, the probability for all user ids mapped to a single partition id are invalid is really low, with a good choice of hash function.
          Hide
          navina Navina Ramesh added a comment -

          Fixed the bugs and put up a patch. RB here -> https://reviews.apache.org/r/44405/

          Show
          navina Navina Ramesh added a comment - Fixed the bugs and put up a patch. RB here -> https://reviews.apache.org/r/44405/
          Hide
          navina Navina Ramesh added a comment -

          To add to what Yi pointed out regarding the hash function, I think stream processor should be agnostic to partitioning semantics in the input stream. Or at the minimum, it should be able to detect partition count changes and rebalance. This JIRA is just a step towards rebalancing. We want to enable detecting this change in the first place.

          Show
          navina Navina Ramesh added a comment - To add to what Yi pointed out regarding the hash function, I think stream processor should be agnostic to partitioning semantics in the input stream. Or at the minimum, it should be able to detect partition count changes and rebalance. This JIRA is just a step towards rebalancing. We want to enable detecting this change in the first place.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Navina Ramesh, I have a few comments on the RB. Thanks!

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Navina Ramesh , I have a few comments on the RB. Thanks!
          Hide
          navina Navina Ramesh added a comment -

          Attaching patch after addressing Yi's feedback from the RB.

          Show
          navina Navina Ramesh added a comment - Attaching patch after addressing Yi's feedback from the RB.
          Hide
          navina Navina Ramesh added a comment -

          Committed the patch for detecting input stream partition count change. Resolving this for now.

          Show
          navina Navina Ramesh added a comment - Committed the patch for detecting input stream partition count change. Resolving this for now.

            People

            • Assignee:
              navina Navina Ramesh
              Reporter:
              navina Navina Ramesh
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development