Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-1627

Watermark broadcast enhancements

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.14.0, 0.14.1
    • Component/s: None
    • Labels:
      None

      Description

      Currently each upstream task needs to broadcast to every single partition of intermediate streams in order to aggregate watermarks in the consumers. It's O(n^2). For 256 tasks, 256-partition intermediate stream this can easily result in 64k msg/s if we send watermark every second. To illustrate:

      T1     T2    T3

       |   \   /\ |  /\ /\ |

      P1      P2    P3

       

      A better way to do this is to have only one downstream consumer doing the aggregation, and then broadcast to all the partitions. This is safe as we can do a simple proof: if P1 received watermark of t from all T1, T2, and T3, all the messages before t have been published to (P1, P2, P3) already (might not be consumed yet). So P1 can safely broadcast the watermark t to P2 and P3. To illustrate:

      T1     T2     T3

          \      |       /

               P1

             /    \

            P2  P3

      This reduced the full message count from O(n^2) to O. The cost is that this might introduce a few milliseconds delay since we need to exchange the message twice. The benefit clearly wins. In practice, the aggregate consumer can be decided from the (topic.hash() % total partitions) to spread the aggregation if we have multiple intermediate streams.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                xinyu Xinyu Liu
                Reporter:
                xinyu Xinyu Liu
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: