Hive and Filesystem already support streaming source. However, they doesn't support watermark on the source. That means users can't leverage the streaming source to perform the Flink powerful streaming analysis, e.g. window aggregate, interval join, and so on.
In order to make more Hive users can leverage Flink to perform streaming analysis, and also cooperate with the new optimized window-TVF operations (FLIP-145), we need to support watermark for Hive and Filesystem.
Factual data in Hive are usually partitioned by date time, e.g. pt_day=2021-03-19, pt_hour=10. In this case, when the data of partition pt_day=2021-03-19, pt_hour=10 are emitted, we should be able to know all the data before 2021-03-19 11:00:00 have been arrived, so we can emit a watermark value of 2021-03-19 11:00:00. We call this partition watermark.
The partition watermark is much better than record watermark (extract watermark from record, e.g. ts - INTERVAL '1' MINUTE). Because in above example, if we are using partition watermark, the window of [10:00, 11:00) will be triggered when pt_hour=10 is finished. However, if we are using record watermark, the window of [10:00, 11:00) will be triggered when pt_hour=11 is arrived, that will make the pipeline have one more partition dely.
Therefore, we firstly focus on support partition watermark for Hive and Filesystem.
In order to support such watermarks, we propose using the following DDL to define a Hive table with watermark defined:
For filesystem connector, the DDL can be:
I will explain the new function/configuration.
FLIP-66 proposed SYSTEM_WATERMARK function for watermarks preserved in underlying source system.
However, the SYSTEM prefix sounds like a Flink system generated value, but actually, this is a SOURCE system generated value.
So I propose to use SOURCE_WATERMARK intead, this also keeps the concept align with the API of org.apache.flink.table.descriptors.Rowtime#watermarksFromSource.
- partition.time-extractor.timestamp-pattern: this option already exists. This is used to extract/convert partition value to a timestamp value.
- partition.time-interval: this is a new option. It indicates the minimal time interval of the partitions. It's used to calculate the correct watermark when a partition is finished. The watermark = partition-timestamp + time-inteval.
We all know that we can't create a new table for an existing Hive table. So we should support altering existing Hive table to add the watermark inforamtion.
This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634. Because watermark, computed column, table options are all encoded in Hive table parameters,
so other systems (e.g. Hive MR, Spark) can still read this Hive table as usual.
1. SplitEnumerator: monitors new partitions throught PartitionMonitor, sorts partitions by partition name, adds new splits of new partitions to SplitAssigner, and tags the last split of each partition.
2. SourceReader: request split to SplitEnumerator, when setup or read out a split.
3. SplitEnumerator: get split from SplitAssigner, assigned it to the requested reader. If the split is the last one of the partition, then broadcast a watermark event to all the readers.
4. SourceReader receive split: start to read data of the assigned split
5. SourceReader recieve watermark: If there is assigned splits, output received watermark when splits are read out. If no assigned splits, output received watermark right now.
Note: the SplitAssigner should assign splits in FIFO order.
The above implementation doesn't require new interface or new method of FLIP-27 source. All can be implemented in Hive/Filesystem connector module.
|Update type of HiveTablePartition#partitionSpec from Map<String, Object> to Map<String, String>||Closed|
|Introduce SOURCE_WATERMARK built-infunction to preserve watermark from source||Closed|
|Support higher precision of TIMESTAMP type as rowtime attribute||Open||Unassigned|
|Hive streaming source should use FIFO FileSplitAssigner instead of LIFO||Open||Unassigned|
|Support computed column syntax for Hive DDL dialect||Open||Unassigned|
|Support watermark syntax for Hive DDL dialect||Open||Unassigned|
|Support HiveSource to emit watermarks which is extract from partition values||Open||Unassigned|
|Unify API and implementation for Hive and Filesystem source connector||Open||Unassigned|