Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-591

Better handling of watermark in KafkaIO

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.4.0
    • io-java-kafka
    • None

    Description

      Right now default watermark in KafkaIO is same as timestamp of the record. The main problem with this is that watermark does not change if there n't any new records on the topic. This can hold up many open windows.

      The record timestamp by default is set to processing time (i.e. when the runner reads a record from Kafka reader).

      A user can provide functions to calculate watermark and record timestamps. There are a few concerns with current design:

      • What should happen when a kafka topic is idle:
        • in default case, I think watermark should advance to current time.
        • What should happen when user has provided a function to calculate record timestamp?
          • Should the watermark stay same as record timestamp?
          • same when user has provided own watermark function?
      • Are the current semantics of user provided watermark function correct?
        • it is run once for each record read.
        • Should it instead be run inside getWatermark() called by the runner (we could still provide the last user record, and its timestamp).
        • It does run inside getWatermark(). should we pass current record timestamp in addition to the record?

      Attachments

        Activity

          People

            rangadi Raghu Angadi
            rangadi Raghu Angadi
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 3h 40m
                3h 40m