Affects Version/s: 1.4.2
Fix Version/s: None
Component/s: Connectors / Kafka
On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began consuming records from the earliest offsets available in Kafka for the partitions assigned to it. Other subtasks did not exhibit this behavior and continued operating normally.
Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints and the job did not restore or restart. Flink logs only shoed the following message:
The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config.
Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions for around 30 seconds as a result of losing its connection to ZooKeeper.
The broker immediately reconnected to after a few tries ZK:
By 9:35:02 partitions had returned to the broker.
It appears this it the broker that the subtask was consuming from, as outgoing network traffic from it spiked after the broker recovered leadership of its partitions, which is consistent with the subtask starting to consuming from the earliest offset.
This may have been related to this [Kafka issue 5600](https://issues.apache.org/jira/browse/KAFKA-5600), which affects 0.11.0.0, the version we are running, and that was fixed in 0.11.0.1. But that seems unlikely as the Flink Kafka connector consumer shouldn't make use of the offsets committed in Kafka when operating with checkpoints enabled, nor when the job is not restarting or being restored.