Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21871

Support watermark for Hive and Filesystem streaming source




      Hive and Filesystem already support streaming source. However, they doesn't support watermark on the source. That means users can't leverage the streaming source to perform the Flink powerful streaming analysis, e.g. window aggregate, interval join, and so on.

      In order to make more Hive users can leverage Flink to perform streaming analysis, and also cooperate with the new optimized window-TVF operations (FLIP-145), we need to support watermark for Hive and Filesystem.

      How to emit watermark in Hive and Filesystem

      Factual data in Hive are usually partitioned by date time, e.g. pt_day=2021-03-19, pt_hour=10. In this case, when the data of partition pt_day=2021-03-19, pt_hour=10 are emitted, we should be able to know all the data before 2021-03-19 11:00:00 have been arrived, so we can emit a watermark value of 2021-03-19 11:00:00. We call this partition watermark.

      The partition watermark is much better than record watermark (extract watermark from record, e.g. ts - INTERVAL '1' MINUTE). Because in above example, if we are using partition watermark, the window of [10:00, 11:00) will be triggered when pt_hour=10 is finished. However, if we are using record watermark, the window of [10:00, 11:00) will be triggered when pt_hour=11 is arrived, that will make the pipeline have one more partition dely.

      Therefore, we firstly focus on support partition watermark for Hive and Filesystem.


      In order to support such watermarks, we propose using the following DDL to define a Hive table with watermark defined:

      -- using hive dialect
      CREATE TABLE hive_table (
        x int, 
        y string,
        z int,
        ts timestamp,
      ) PARTITIONED BY (pt_day string, pt_hour string) 
        'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
      -- window aggregate on the hive table
      SELECT window_start, window_end, COUNT(*), MAX(y), SUM(z)
         TUMBLE(TABLE hive_table, DESCRIPTOR(ts), INTERVAL '1' HOUR))
      GROUP BY window_start, window_end;

      For filesystem connector, the DDL can be:

      CREATE TABLE fs_table (
          x int,
          y string,
          z int,
          ts TIMESTAMP(3),
          pt_day string,
          pt_hour string,
      ) PARTITIONED BY (pt_day, pt_hour)
        WITH (
          'connector' = 'filesystem',
          'path' = '/path/to/file',
          'format' = 'parquet',
          'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',

      I will explain the new function/configuration.

      SOURCE_WATERMARK built-in function

      FLIP-66[1] proposed SYSTEM_WATERMARK function for watermarks preserved in underlying source system.
      However, the SYSTEM prefix sounds like a Flink system generated value, but actually, this is a SOURCE system generated value.
      So I propose to use SOURCE_WATERMARK intead, this also keeps the concept align with the API of org.apache.flink.table.descriptors.Rowtime#watermarksFromSource.

      Table Options for Watermark

      • partition.time-extractor.timestamp-pattern: this option already exists. This is used to extract/convert partition value to a timestamp value.
      • partition.time-interval: this is a new option. It indicates the minimal time interval of the partitions. It's used to calculate the correct watermark when a partition is finished. The watermark = partition-timestamp + time-inteval.

      How to support watermark for existing Hive tables

      We all know that we can't create a new table for an existing Hive table. So we should support altering existing Hive table to add the watermark inforamtion.
      This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634. Because watermark, computed column, table options are all encoded in Hive table parameters,
      so other systems (e.g. Hive MR, Spark) can still read this Hive table as usual.

      ALTER TABLE hive_table ADD (
      ALTER TABLE hive_table SET (
        'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',

      Implementation Details

      1. SplitEnumerator: monitors new partitions throught PartitionMonitor, sorts partitions by partition name, adds new splits of new partitions to SplitAssigner, and tags the last split of each partition.
      2. SourceReader: request split to SplitEnumerator, when setup or read out a split.
      3. SplitEnumerator: get split from SplitAssigner, assigned it to the requested reader. If the split is the last one of the partition, then broadcast a watermark event to all the readers.
      4. SourceReader receive split: start to read data of the assigned split
      5. SourceReader recieve watermark: If there is assigned splits, output received watermark when splits are read out. If no assigned splits, output received watermark right now.

      Note: the SplitAssigner should assign splits in FIFO order.

      The above implementation doesn't require new interface or new method of FLIP-27 source. All can be implemented in Hive/Filesystem connector module.

      [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL


          Issue Links



              • Assignee:
                jark Jark Wu
              • Votes:
                0 Vote for this issue
                14 Start watching this issue


                • Created: