From 06e1ea2960c2d0dffc9245fa52a0e281d72c846c Mon Sep 17 00:00:00 2001 From: kyotoYaho Date: Mon, 16 May 2016 20:15:56 +0800 Subject: [PATCH] KYLIN-1696: add catch exception during sending topic meta request to broker --- .../kylin/source/kafka/util/KafkaRequester.java | 24 ++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java index b78d30f..01c3946 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java @@ -101,7 +101,13 @@ public final class KafkaRequester { consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup"); List topics = Collections.singletonList(kafkaClusterConfig.getTopic()); TopicMetadataRequest req = new TopicMetadataRequest(topics); - TopicMetadataResponse resp = consumer.send(req); + TopicMetadataResponse resp; + try{ + resp = consumer.send(req); + }catch (Exception e){ + logger.warn("cannot send TopicMetadataRequest successfully: " + e); + continue; + } final List topicMetadatas = resp.topicsMetadata(); if (topicMetadatas.size() != 1) { break; @@ -124,12 +130,19 @@ public final class KafkaRequester { } public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List brokers, KafkaClusterConfig kafkaClusterConfig) { + logger.debug("Brokers: " + brokers.toString()); SimpleConsumer consumer; for (Broker broker : brokers) { consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup"); List topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); - TopicMetadataResponse resp = consumer.send(req); + TopicMetadataResponse resp; + try{ + resp = consumer.send(req); + }catch (Exception e){ + logger.warn("cannot send TopicMetadataRequest successfully: " + e); + continue; + } final List topicMetadatas = resp.topicsMetadata(); if (topicMetadatas.size() != 1) { logger.warn("invalid topicMetadata size:" + topicMetadatas.size()); @@ -141,6 +154,13 @@ public final class KafkaRequester { break; } for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) { + StringBuffer logText = new StringBuffer(); + logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode()); + logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId()); + logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader()); + logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr()); + logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas()); + logger.info(logText.toString()); if (partitionMetadata.partitionId() == partitionId) { return partitionMetadata; } -- 2.5.4 (Apple Git-61)