From dc44f7cfbee49644b09c486d6d3c5eed72c5b4b1 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 23 Feb 2016 18:31:37 +0800 Subject: [PATCH] KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code --- .../main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 5 ++++- .../src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java | 5 ++++- .../src/main/java/org/apache/kylin/streaming/KafkaConsumer.java | 5 ++++- .../src/main/java/org/apache/kylin/streaming/StreamingUtil.java | 5 ++++- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index 393b8e7..581fe8f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -133,7 +133,10 @@ public class KafkaStreamingInput implements IStreamingInput { private Broker getLeadBroker() { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } replicaBrokers = partitionMetadata.replicas(); return partitionMetadata.leader(); } else { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index 2833ea4..fc3b7f6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -33,7 +33,10 @@ public final class KafkaUtils { public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } return partitionMetadata.leader(); } else { return null; diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java index 6d693c3..869e555 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java @@ -92,7 +92,10 @@ public class KafkaConsumer implements Runnable { private Broker getLeadBroker() { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(topic, partitionId, replicaBrokers, streamingConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } replicaBrokers = partitionMetadata.replicas(); return partitionMetadata.leader(); } else { diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java index 1d6b95c..4cf837e 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java @@ -30,7 +30,10 @@ public final class StreamingUtil { public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } return partitionMetadata.leader(); } else { return null; -- 2.5.4 (Apple Git-61)