Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10740 FLIP-27: Refactor Source Interface
  3. FLINK-17899

Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources

    XMLWordPrintableJSON

Details

    Description

      Preambel: This whole discussion is to some extend only necessary, because in the SourceReader, we pass the SourceOutput as a parameter to the pollNext(...) method. However, this design follows some deeper runtime pipeline design, and is not easy to change at this stage.

       

      There are some principle design choices here:

       

      (1) Do we make Timestamps and Watermarks purely a feature of the library (ConnectorBase), or do we integrate it with the core (SourceOperator).

      Making it purely a responsibility of the ConnectorBase would have the advantage of keeping the SourceOperator simple. However, there is value in integrating this with the SourceOperator.

      • Implementations that are not using the ConnectorBase (like simple collection- or iterator-based sources) would automatically get access to the plug-able TimestampExtractors and WatermarkGenerators.
      • When executing batch programs, the SourceOperator can transparently inject a "no-op" WatermarkGenerator so make sure no Watermarks are generated during the batch execution. Given that batch sources are very performance sensitive, it seems useful to not even run the watermark generator logic, rather than later dropping the watermarks.
      • In a future version, we may want to implement "global watermark holds" generated my the Enumerators: The enumerator tells the readers how far they may advance their local watermarks. This can help to not prematurely advance the watermark based on a split's records when other splits have data overlapping with older ranges. An example where this is commonly the case is the streaming file source.

       

      (2) Is the per-partition watermarking purely a feature of the library (ConnectorBase), or do we integrate it with the core (SourceOperator).

      I believe we need to solve this on the same level as the previous question:

      • Once a connector instantiates the per-partition watermark generators, the main output (through which the SourceReader emits the records) must not run its watermark generator any more. Otherwise we extract watermarks also on the merged stream, which messes things up. So having the per-partition watermark generators simply in the ConnectorBase and emit transparently through an unchanged main output would not work.
      • So, if we decide to implement watermarks support in the core (SourceOperator), we would need to offer the per-partition watermarking utilities on that level as well.
      • Along a similar line of thoughts as in the previous point, the batch execution can optimize the watermark extraction by supplying no-op extractors also for the per-partition extractors (which will most likely bear the bulk of the load in the connectors).

       

      (3) How would an integration of WatermarkGenerators with the SourceOperator look like?

      Rather straightforward, the SourceOperator instantiates a SourceOutput that internally runs the timestamp extractor and watermark generator and emits to the DataOutput that the operator emits to.

       

      (4) How would an integration of the per-split WatermarkGenerators look like?

      I would propose to introduce a class ReaderMainOutput which extends SourceOutput and. The SourceReader should accept a ReaderMainOutput instead of a SourceOutput.

       

      public interface ReaderMainOutput<T> extends SourceOutput<T> {
      
         @Override
         void collect(T record);
      
         @Override
         void collect(T record, long timestamp);
      
         SourceOutput<T> createOutputForSplit(String splitId);
      
         void releaseOutputForSplit(String splitId);
      }
      

      Attachments

        Issue Links

          Activity

            People

              sewen Stephan Ewen
              sewen Stephan Ewen
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: