Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.12.1
-
None
Description
Currently, when FlinkKafkaConsumer is restored from savepoint, the following code will handle topics that do not have offsets committed (for example if a new topic was added):
if (restoredState != null) { for (KafkaTopicPartition partition : allPartitions) { if (!restoredState.containsKey(partition)) { restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); } }
So if we have a KafkaConsumer with topicPattern and the pattern is changed, new topis will always start from earliest offset, even if originally the setting was different.