Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Currently, the Flink Kafka Connector does not support split deletion and is left as a TODO. I want to add this feature by doing these steps:
1. Add SplitsDeletion event to flink-connector-base, which currently only has SplitsAddition.
2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a SplitsDeletion event to the source operator. To maintain compatibility, a default empty implementation for this method will be added.
3. Make SourceOperator handle the SplitsDeletion event, notifiying the SourceReader to delete splits.
4. Create a deleteSplits method in SourceReader to remove splits, including remove them from Split state and stopping SourceReader from reading the deleted splits.
As an alternative, without modifying the flink-connector-base, KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for splits deletion and deal with it in the kafka-connector-specific code. But I think it's better to have SplitsDeletion in flink-connector-base, so other connectors can use it too.
Let me know if you have any thoughts or ideas. Thanks!
Related Issues: FLINK-30490
Attachments
Issue Links
- duplicates
-
FLINK-30490 Deleted topic from KafkaSource is still included in subsequent restart from savepoint
- Closed