Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8347

UnboundedRabbitMqReader can fail to advance watermark if no new data comes in

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.15.0
    • Fix Version/s: 2.18.0
    • Component/s: io-java-rabbitmq
    • Labels:
      None
    • Environment:
      testing has been done using the DirectRunner. I also have DataflowRunner available

      Description

      I stumbled upon this and then saw a similar StackOverflow post: https://stackoverflow.com/questions/55736593/apache-beam-rabbitmqio-watermark-doesnt-advance

      When calling `advance()` if there are no messages, no state changes, including no changes to the CheckpointMark or Watermark.  If there is a relatively constant rate of new messages coming in, this is not a problem. If data is bursty, and there are periods of no new messages coming in, the watermark will never advance.

      Contrast this with some of the logic in PubsubIO which will make provisions for periods of inactivity to advance the watermark (although it, too, is imperfect: https://issues.apache.org/jira/browse/BEAM-7322 )

      The example given in the StackOverflow post is something like this:

       

      pipeline
        .apply(RabbitMqIO.read()
        .withUri("amqp://guest:guest@localhost:5672")
        .withQueue("test")
        .apply("Windowing", 
          Window.<RabbitMqMessage>into(
            FixedWindows.of(Duration.standardSeconds(10)))
          .triggering(AfterWatermark.pastEndOfWindow())
          .withAllowedLateness(Duration.ZERO)
          .accumulatingFiredPanes())

      If I push 2 messages into my rabbit queue, I see 2 unack'd messages and a window that never performs an on time trigger.

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jbonofre Jean-Baptiste Onofré
                Reporter:
                drobert Daniel Robert
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 50m
                  3h 50m