Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
4.0.0-incubating, 4.1.0-incubating
-
None
-
None
Description
It has been reported that for a new consumer group, when user uses CONSUME_FROM_LAST_OFFSET , the consumer still consumes the old messages.
If a new consumer group starts, most of the case CONSUME_FROM_LAST_OFFSET will work as expected, the code base is in queryConsumerOffset in ConsumerManageProcessor.java.
But sometimes It will return the min offset, which will be confusing. This happens only when the topic is still quite new(minOffset==0) and no large accumulation (checkInDiskByConsumeOffset=false).
if (offset >= 0) { responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId()); if (minOffset <= 0 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset( requestHeader.getTopic(), requestHeader.getQueueId(), 0)) { //here is why sometims consume_from_last does not work as expected responseHeader.setOffset(0L); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); } }
Apparently this is to ensure that if a new queue is scalely created, messages should still be consumed in the newly created queue.
But this makes CONSUME_FROM_LAST_OFFSET not work as expected.
Similar bug is reported in https://issues.apache.org/jira/browse/ROCKETMQ-112 but it is closed as WON'T FIX.