From 625e2e09a76efeb459f016c21c98c0a4346427d2 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 24 Jul 2015 09:20:47 +0200 Subject: [PATCH] KAFKA-2100; Preserve original server error code when it is an unknown code. --- .../org/apache/kafka/common/protocol/Errors.java | 132 +++++++++++---------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index d6c41c1..74d498b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -27,79 +27,21 @@ import org.apache.kafka.common.errors.*; * * Do not add exceptions that occur only on the client or only on the server here. */ -public enum Errors { - UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), - NONE(0, null), - OFFSET_OUT_OF_RANGE(1, - new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), - CORRUPT_MESSAGE(2, - new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), - UNKNOWN_TOPIC_OR_PARTITION(3, - new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), - // TODO: errorCode 4 for InvalidFetchSize - LEADER_NOT_AVAILABLE(5, - new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), - NOT_LEADER_FOR_PARTITION(6, - new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), - REQUEST_TIMED_OUT(7, - new TimeoutException("The request timed out.")), - // TODO: errorCode 8 for BrokerNotAvailable - REPLICA_NOT_AVAILABLE(9, - new ApiException("The replica is not available for the requested topic-partition")), - MESSAGE_TOO_LARGE(10, - new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), - OFFSET_METADATA_TOO_LARGE(12, - new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), - NETWORK_EXCEPTION(13, - new NetworkException("The server disconnected before a response was received.")), - OFFSET_LOAD_IN_PROGRESS(14, - new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")), - CONSUMER_COORDINATOR_NOT_AVAILABLE(15, - new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")), - NOT_COORDINATOR_FOR_CONSUMER(16, - new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")), - INVALID_TOPIC_EXCEPTION(17, - new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), - RECORD_LIST_TOO_LARGE(18, - new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), - NOT_ENOUGH_REPLICAS(19, - new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), - NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, - new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")), - INVALID_REQUIRED_ACKS(21, - new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")), - ILLEGAL_GENERATION(22, - new IllegalGenerationException("Specified consumer generation id is not valid.")), - INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23, - new ApiException("The request partition assignment strategy does not match that of the group.")), - UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24, - new ApiException("The request partition assignment strategy is unknown to the broker.")), - UNKNOWN_CONSUMER_ID(25, - new UnknownConsumerIdException("The coordinator is not aware of this consumer.")), - INVALID_SESSION_TIMEOUT(26, - new ApiException("The session timeout is not within an acceptable range.")), - COMMITTING_PARTITIONS_NOT_ASSIGNED(27, - new ApiException("Some of the committing partitions are not assigned the committer")), - INVALID_COMMIT_OFFSET_SIZE(28, - new ApiException("The committing offset data size is not valid")); - +public class Errors { private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); - static { - for (Errors error : Errors.values()) { - codeToError.put(error.code(), error); - if (error.exception != null) - classToError.put(error.exception.getClass(), error); - } - } - private final short code; private final ApiException exception; private Errors(int code, ApiException exception) { this.code = (short) code; this.exception = exception; + + codeToError.put(this.code, this); + if (this.exception != null) { + classToError.put(this.exception.getClass(), this); + } } /** @@ -130,7 +72,11 @@ public enum Errors { */ public static Errors forCode(short code) { Errors error = codeToError.get(code); - return error == null ? UNKNOWN : error; + if (error != null) { + return error; + } else { + return new Errors(code, new UnknownServerException("Unexpected error code " + code)); + } } /** @@ -140,4 +86,60 @@ public enum Errors { Errors error = classToError.get(t.getClass()); return error == null ? UNKNOWN : error; } + + public static final Errors UNKNOWN = new Errors(-1, + new UnknownServerException("The server experienced an unexpected error when processing the request")); + public static final Errors NONE = new Errors(0, null); + public static final Errors OFFSET_OUT_OF_RANGE = new Errors(1, + new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")); + public static final Errors CORRUPT_MESSAGE = new Errors(2, + new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")); + public static final Errors UNKNOWN_TOPIC_OR_PARTITION = new Errors(3, + new UnknownTopicOrPartitionException("This server does not host this topic-partition.")); + // TODO: errorCode 4 for InvalidFetchSize + public static final Errors LEADER_NOT_AVAILABLE = new Errors(5, + new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")); + public static final Errors NOT_LEADER_FOR_PARTITION = new Errors(6, + new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")); + public static final Errors REQUEST_TIMED_OUT = new Errors(7, + new TimeoutException("The request timed out.")); + // TODO: errorCode 8 for BrokerNotAvailable + public static final Errors REPLICA_NOT_AVAILABLE = new Errors(9, + new ApiException("The replica is not available for the requested topic-partition")); + public static final Errors MESSAGE_TOO_LARGE = new Errors(10, + new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")); + public static final Errors OFFSET_METADATA_TOO_LARGE = new Errors(12, + new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")); + public static final Errors NETWORK_EXCEPTION = new Errors(13, + new NetworkException("The server disconnected before a response was received.")); + public static final Errors OFFSET_LOAD_IN_PROGRESS = new Errors(14, + new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")); + public static final Errors CONSUMER_COORDINATOR_NOT_AVAILABLE = new Errors(15, + new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")); + public static final Errors NOT_COORDINATOR_FOR_CONSUMER = new Errors(16, + new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")); + public static final Errors INVALID_TOPIC_EXCEPTION = new Errors(17, + new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")); + public static final Errors RECORD_LIST_TOO_LARGE = new Errors(18, + new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")); + public static final Errors NOT_ENOUGH_REPLICAS = new Errors(19, + new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")); + public static final Errors NOT_ENOUGH_REPLICAS_AFTER_APPEND = new Errors(20, + new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); + public static final Errors INVALID_REQUIRED_ACKS = new Errors(21, + new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")); + public static final Errors ILLEGAL_GENERATION = new Errors(22, + new IllegalGenerationException("Specified consumer generation id is not valid.")); + public static final Errors INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY = new Errors(23, + new ApiException("The request partition assignment strategy does not match that of the group.")); + public static final Errors UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY = new Errors(24, + new ApiException("The request partition assignment strategy is unknown to the broker.")); + public static final Errors UNKNOWN_CONSUMER_ID = new Errors(25, + new UnknownConsumerIdException("The coordinator is not aware of this consumer.")); + public static final Errors INVALID_SESSION_TIMEOUT = new Errors(26, + new ApiException("The session timeout is not within an acceptable range.")); + public static final Errors COMMITTING_PARTITIONS_NOT_ASSIGNED = new Errors(27, + new ApiException("Some of the committing partitions are not assigned the committer")); + public static final Errors INVALID_COMMIT_OFFSET_SIZE = new Errors(28, + new ApiException("The committing offset data size is not valid")); } -- 2.3.2 (Apple Git-55)