diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala index f8cf6c3..c72ca14 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala @@ -29,8 +29,9 @@ object ConsumerMetadataResponse { def readFrom(buffer: ByteBuffer) = { val correlationId = buffer.getInt val errorCode = buffer.getShort + val broker = Broker.readFrom(buffer) val coordinatorOpt = if (errorCode == ErrorMapping.NoError) - Some(Broker.readFrom(buffer)) + Some(broker) else None diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ba5fbdc..fa7f3e4 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -160,6 +160,11 @@ object ClientUtils extends Logging{ debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt + else { + info("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds." + .format(queryChannel.host, queryChannel.port, group, retryBackOffMs)) + Thread.sleep(retryBackOffMs) + } } catch { case ioe: IOException =>