Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19857 FLIP-149: Introduce the upsert-kafka Connector
  3. FLINK-19878

Fix WatermarkAssigner shouldn't be after ChangelogNormalize

    XMLWordPrintableJSON

Details

    Description

      Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node,  it may returns Long.MaxValue as watermark if some parallelism doesn't have data. 

       

         +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
            +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D])
               +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
                  +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
                     +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
      

       

      As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source.

       

      Attachments

        Issue Links

          Activity

            People

              jark Jark Wu
              leonard Leonard Xu
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: