Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
kafka-3.1.0
-
None
-
None
Description
When I start a stream program to consume Kafka (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically detect new partitions after Kafka adds partitions.
Reason
In the KafkaSourceBuilder, this parameter is checked to see if it has been overridden. Since I did not set this parameter, even though it is CONTINUOUS_UNBOUNDED, it still sets partition.discovery.interval.ms = -1.
In the KafkaSourceEnumerator, the value of partition.discovery.interval.ms is then -1, instead of the default value of 5 minutes, so automatic partition discovery does not work, and the default value of 5 minutes for partition.discovery.interval.ms is meaningless.
A possible solution is to set partition.discovery.interval.ms = -1 only if boundedness == Boundedness.BOUNDED is true.