From d2bd5eba532386cdcfc9a29897ee1dfc8d0d148e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 19 Mar 2015 13:33:52 -0700 Subject: [PATCH 1/2] follow-up v1 --- clients/src/main/java/org/apache/kafka/common/record/Record.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index d2332c9..50fac24 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -324,7 +324,6 @@ public final class Record { checksum(), key() == null ? 0 : key().limit(), value() == null ? 0 : value().limit()); - } public boolean equals(Object other) { -- 1.7.12.4 From af81b0d903b6c7a30595e9a1d3c5d68965dba846 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 7 Apr 2015 11:05:14 -0700 Subject: [PATCH 2/2] revert K1910 --- .../org/apache/kafka/clients/consumer/internals/Coordinator.java | 6 ++---- clients/src/main/java/org/apache/kafka/common/protocol/Errors.java | 4 +--- .../apache/kafka/clients/consumer/internals/CoordinatorTest.java | 2 +- core/src/main/scala/kafka/common/ErrorMapping.scala | 4 +--- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 8d44814..377d033 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -234,16 +234,14 @@ public final class Coordinator { coordinatorDead(); offsetsReady = false; Utils.sleep(this.retryBackoffMs); - } else if (data.errorCode == Errors.NO_OFFSETS_FETCHABLE.code() - || data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { // just ignore this partition log.debug("No committed offset for partition " + tp); } else { throw new IllegalStateException("Unexpected error code " + data.errorCode + " while fetching offset"); } } else if (data.offset >= 0) { - // record the position with the offset (-1 seems to indicate no - // such offset known) + // record the position with the offset (-1 indicate no committed offset to fetch) offsets.put(tp, data.offset); } else { log.debug("No committed offset for partition " + tp); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ce18a6c..36aa412 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -69,9 +69,7 @@ public enum Errors { INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")), ILLEGAL_GENERATION(22, - new ApiException("Specified consumer generation id is not valid.")), - NO_OFFSETS_FETCHABLE(23, - new ApiException("No offsets have been committed so far.")); + new ApiException("Specified consumer generation id is not valid.")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index 1de22b9..b06c4a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -244,7 +244,7 @@ public class CoordinatorTest { assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp)); // fetch with no fetchable offsets - client.prepareResponse(offsetFetchResponse(tp, Errors.NO_OFFSETS_FETCHABLE.code(), "", 100L)); + client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size()); // fetch with offset topic unknown diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index eb1eb4a..c75c685 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -51,7 +51,6 @@ object ErrorMapping { val NotEnoughReplicasAfterAppendCode: Short = 20 // 21: InvalidRequiredAcks // 22: IllegalConsumerGeneration - val NoOffsetsCommittedCode: Short = 23 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -73,8 +72,7 @@ object ErrorMapping { classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode, classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode, classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode, - classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode, - classOf[NoOffsetsCommittedException].asInstanceOf[Class[Throwable]] -> NoOffsetsCommittedCode + classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode ).withDefaultValue(UnknownCode) /* invert the mapping */ -- 1.7.12.4