Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.11.0.0
-
None
-
None
Description
I use kafka consumer java api to poll messages from broker, and here is the code:
Consumer consumer = new Consumer(props);
consumer.assgin(topicPartitions);
long nextOffset = consumer.position(topicPartition);
consumer.poll();
consumer.commitSync();
The above code is called by a quartz scheduler every minute and the group.id is always the same. It ran properly during past several days until today around 8:20:35 am, the position api always returned the older offset committed two days ago, not the latest one which was committed around 8:20:33 am. It seems the kafka offset of this group.id just went backward
I polled the offsets message from the kafka internal topic __consumer_offsets and saw the lastes message was correct, which is like this:
[eb89887c591b4d2a98c7,my-topic-eb89887c591b4d2a98c7,0]::[OffsetMetadata[447648316,NO_METADATA],CommitTime 1525220421173,ExpirationTime 1526430021173]
The commitTime showed it was indeed the last successful commit.
But then the position api returned a wrong offset, which is the first message of the corresponding partition of _consumer_offsets. It is like kafka broker regards this older committed offset is the correct offset of this group.id, but the correct one should have been last message in the _consumer_offsets.
Then I checked the broker server log and found at that time there are some connection errors, which just the same time the position is called.
08:20:33,261 WARN Attempting to send response via channel for which there is no open connection, connection id 2 (kafka.network.Processor)
There are some other consumer trying to call position at this time and the leader of those topics are this broker too. After that they call get a wrong offset which were older commits in __consumer_offsets.