From e9fe27139bcfcb0bd2fe55aa39e140e9885fcd43 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 23 Feb 2016 17:37:11 +0800 Subject: [PATCH 1/2] KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code --- .../main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 5 ++++- .../src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index ee5a555..bcde47b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -123,7 +123,10 @@ public class KafkaStreamingInput implements IStreamingInput { private Broker getLeadBroker() { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } replicaBrokers = partitionMetadata.replicas(); return partitionMetadata.leader(); } else { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index ab54abb..f506999 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -51,7 +51,10 @@ public final class KafkaUtils { public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } return partitionMetadata.leader(); } else { return null; -- 2.5.4 (Apple Git-61) From 84ccfe6f435fa952527c28be6545b35919f5fbb8 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 23 Feb 2016 18:01:17 +0800 Subject: [PATCH 2/2] KYLIN-1436: If error exists during fetching streaming messages, streaming building should throw exception --- .../main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index bcde47b..2e262b3 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -90,8 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput { logger.warn("this thread should not be interrupted, just ignore", e); continue; } catch (ExecutionException e) { - logger.error("error when get StreamingMessages", e.getCause()); - continue; + throw new RuntimeException("error when get StreamingMessages",e.getCause()); } } final Pair timeRange = Pair.newPair(startTime, endTime); -- 2.5.4 (Apple Git-61)