Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Duplicate
-
0.9.0.1
-
None
-
None
Description
KafkaConsumer v0.9::
I have a consumer set up with session.timeout.ms set to 30s. I make a call like
consumer.poll(10000)
but if the kafka broker is down, that call will hang indefinitely.
Digging into the code it seems that the timeout isn't respected:
KafkaConsumer calls out to pollOnce() as seen below::
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
coordinator.ensureCoordinatorKnown();
// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// init any new fetches (won't resend pending fetches)
Cluster cluster = this.metadata.fetch();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
// if data is available already, e.g. from a previous network client poll() call to commit,
// then just return it immediately
if (!records.isEmpty())
fetcher.initFetches(cluster);
client.poll(timeout);
return fetcher.fetchedRecords();
}
and we see that we stick on the call to coordinator.ensureCoordinatorKnown();
AbstractCoordinator ::
public void ensureCoordinatorKnown() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupMetadataRequest();
client.poll(future);
if (future.failed())
{ if (future.isRetriable()) client.awaitMetadataUpdate(); else throw future.exception(); } }
}
in this case the Future fails (since the broker is down) and then a call to client.awaitMetadataUpdate() is made which in the case of the ConsumerNetworkClient will block forever :
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do
while (this.metadata.version() == version);
}
I feel that this is a bug. When you set a timeout on a call to a blocking method, that timeout should be respected and an exception should be thrown.
Attachments
Issue Links
- duplicates
-
KAFKA-1894 Avoid long or infinite blocking in the consumer
- Resolved