Currently, to start reading from the "earliest" and "latest" position in topics for the Flink Kafka consumer, users set the Kafka config auto.offset.reset in the provided properties configuration.
However, the way this config actually works might be a bit misleading if users were trying to find a way to "read topics from a starting position". The way the auto.offset.reset config works in the Flink Kafka consumer resembles Kafka's original intent for the setting: first, existing external offsets committed to the ZK / brokers will be checked; if none exists, then will auto.offset.reset be respected.
I propose to add Flink-specific ways to define the starting position, without taking into account the external offsets. The original behaviour (reference external offsets first) can be changed to be a user option, so that the behaviour can be retained for frequent Kafka users that may need some collaboration with existing non-Flink Kafka consumer applications.
How users will interact with the Flink Kafka consumer after this is added, with a newly introduced flink.starting-position config:
Or, reference external offsets in ZK / broker:
A thing we would need to decide on is what would the default value be for flink.starting-position.
Two merits I see in adding this:
1. This compensates the way users generally interpret "read from a starting position". As the Flink Kafka connector is somewhat essentially a "high-level" Kafka consumer for Flink users, I think it is reasonable to add Flink-specific functionality that users will find useful, although it wasn't supported in Kafka's original consumer designs.
2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is used only to expose progress to the outside world, and not used to manipulate how Kafka topics are read in Flink (unless users opt to do so)" is even more definite and solid. There was some discussion in this PR (https://github.com/apache/flink/pull/1690,
FLINK-3398) on this aspect. I think adding this "decouples" more Flink's internal offset checkpointing from the external Kafka's offset store.