Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is followed after ChangelogNormalize Node, it may returns Long.MaxValue as watermark if some parallelism doesn't have data.
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I,UA,D]) +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
As an improvement, we can move the WatermarkAssigner to be after the SourceCan Node and thus the watermark will produce like general Source.
Attachments
Issue Links
- links to