Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3699

RecordTimestamp should be the default Watermark in KafkaIO

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • None
    • Not applicable
    • io-java-kafka
    • None

    Description

      Currently, the priority to get Watermark Instance in KafkaIO is:

      getWatermarkFn().apply(curRecord)
        getTimestampFn().apply(record)
          Instant.now()
      

      I would propose to change it as below to leverage KafkaRecord.timestamp if no WatermarkFn() or TimestampFn() is available:

      getWatermarkFn().apply(curRecord)
        getTimestampFn().apply(record)
          KafkaRecord(Beam.KafkaIO).timestamp
      

      It equals to

      getWatermarkFn().apply(curRecord)
        getTimestampFn().apply(record)
          KafkaRawRecord(Kafka_client).timestamp
            Instant.now()
      

      rangadi any comments?

      Attachments

        Issue Links

          Activity

            People

              mingmxu Mingmin Xu
              mingmxu Mingmin Xu
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h
                  1h