From 0b37d2128cc483e11ada475e905139e36eb5d25a Mon Sep 17 00:00:00 2001 From: Hongliang Su Date: Sun, 15 Feb 2015 15:34:09 +0800 Subject: [PATCH 5/5] fix: new consumer client can't fetch records from kafka server fetch response doesn't return even after one completeAll invoke block until the fetch response returns TODO: whether there is an effective way to wait until fetch response returns --- .../kafka/clients/consumer/KafkaConsumer.java | 31 ++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 67ceb75..3362088 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -706,15 +706,36 @@ public class KafkaConsumer implements Consumer { /* * initiate any needed fetches, then block for the timeout the user specified */ - Cluster cluster = this.metadata.fetch(); - reinstateFetches(cluster, now); - client.poll(timeout, now); +// Cluster cluster = this.metadata.fetch(); +// reinstateFetches(cluster, now); +// client.poll(timeout, now); /* * initiate a fetch request for any nodes that we just got a response from without blocking */ - reinstateFetches(cluster, now); - client.poll(0, now); + //reinstateFetches(cluster, now); + //client.poll(0, now); + + // block until the fetch response returns + // TODO whether there is an effective way to wait until fetch response returns + Cluster cluster = this.metadata.fetch(); + + List lst = null; + while(lst == null || lst.isEmpty()) { + + reinstateFetches(cluster, now); + lst = client.poll(timeout, now); + + if(lst.size() > 0) { + ClientResponse resp = lst.get(0); + short apiKey = resp.request().request().header().apiKey(); + if(apiKey != ApiKeys.FETCH.id) { + log.debug("apiKey is {} not ApiKeys.FETCH:{} continue", apiKey, ApiKeys.FETCH.id); + lst.clear(); + } + + } + } return new ConsumerRecords(consumeBufferedRecords()); } -- 1.9.4.msysgit.2