From 37793e2fdff9ab4a71715c5b07c309f1df7f809b Mon Sep 17 00:00:00 2001 From: jholoman Date: Wed, 25 Feb 2015 22:16:12 -0500 Subject: [PATCH] First Pass --- .../common/errors/LeaderNotAvailableException.java | 8 +++-- .../common/errors/OffsetMetadataTooLarge.java | 37 ---------------------- .../errors/OffsetMetadataTooLargeException.java | 35 ++++++++++++++++++++ .../kafka/common/errors/RetriableException.java | 2 +- .../org/apache/kafka/common/protocol/Errors.java | 4 +-- core/src/main/scala/kafka/cluster/Partition.scala | 3 ++ .../src/main/scala/kafka/common/ErrorMapping.scala | 2 ++ .../scala/kafka/common/InvalidTopicException.scala | 22 ------------- .../NotEnoughReplicasAfterAppendException.scala | 27 ---------------- .../kafka/common/NotEnoughReplicasException.scala | 25 --------------- .../common/NotLeaderForPartitionException.scala | 25 --------------- .../common/OffsetMetadataTooLargeException.scala | 27 ---------------- .../kafka/common/OffsetOutOfRangeException.scala | 26 --------------- core/src/main/scala/kafka/common/Topic.scala | 1 + core/src/main/scala/kafka/log/Log.scala | 2 ++ .../src/main/scala/kafka/server/DelayedFetch.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 1 + .../main/scala/kafka/server/ReplicaManager.scala | 2 ++ .../src/test/scala/other/kafka/StressTestLog.scala | 1 + .../test/scala/unit/kafka/common/TopicTest.scala | 3 +- .../unit/kafka/integration/PrimitiveApiTest.scala | 3 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 1 + core/src/test/scala/unit/kafka/log/LogTest.scala | 3 +- 23 files changed, 64 insertions(+), 198 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLargeException.java delete mode 100644 core/src/main/scala/kafka/common/InvalidTopicException.scala delete mode 100644 core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala delete mode 100644 core/src/main/scala/kafka/common/NotEnoughReplicasException.scala delete mode 100644 core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala delete mode 100644 core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala delete mode 100644 core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java index 9d7ebd4..1fff107 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java @@ -20,8 +20,12 @@ public class LeaderNotAvailableException extends InvalidMetadataException { private static final long serialVersionUID = 1L; - public LeaderNotAvailableException(String message) { - super(message); + public LeaderNotAvailableException(String message) { super(message); } + + public LeaderNotAvailableException(Throwable cause) { super(cause); } + + public LeaderNotAvailableException(String message, Throwable cause) { + super(message, cause); } } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java deleted file mode 100644 index 0be2f50..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.errors; - -/** - * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server. - */ -public class OffsetMetadataTooLarge extends ApiException { - - private static final long serialVersionUID = 1L; - - public OffsetMetadataTooLarge() { - } - - public OffsetMetadataTooLarge(String message) { - super(message); - } - - public OffsetMetadataTooLarge(Throwable cause) { - super(cause); - } - - public OffsetMetadataTooLarge(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLargeException.java new file mode 100644 index 0000000..3a08895 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLargeException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server. + */ +public class OffsetMetadataTooLargeException extends ApiException { + + private static final long serialVersionUID = 1L; + + public OffsetMetadataTooLargeException() { + } + + public OffsetMetadataTooLargeException(String message) { + super(message); + } + + public OffsetMetadataTooLargeException(Throwable cause) { super(cause); } + + public OffsetMetadataTooLargeException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java index 6c639a9..0a76013 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java @@ -13,7 +13,7 @@ package org.apache.kafka.common.errors; /** - * A retryable exception is a transient exception that if retried may succeed. + * A retriable exception is a transient exception that if retried may succeed. */ public abstract class RetriableException extends ApiException { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a8deac4..1882fc3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; -import org.apache.kafka.common.errors.OffsetMetadataTooLarge; +import org.apache.kafka.common.errors.OffsetMetadataTooLargeException; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -58,7 +58,7 @@ public enum Errors { REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), - OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), + OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLargeException("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading offsets and can't process requests.")), CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator is not available.")), diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e6ad8be..ed93611 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -29,6 +29,9 @@ import kafka.message.ByteBufferMessageSet import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock +import org.apache.kafka.common.errors.NotEnoughReplicasException +import org.apache.kafka.common.errors.NotLeaderForPartitionException + import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index eedc2f5..75e86d8 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -20,6 +20,8 @@ package kafka.common import kafka.message.InvalidMessageException import java.nio.ByteBuffer import scala.Predef._ +import org.apache.kafka.common.errors._ + /** * A bi-directional mapping between error codes and exceptions diff --git a/core/src/main/scala/kafka/common/InvalidTopicException.scala b/core/src/main/scala/kafka/common/InvalidTopicException.scala deleted file mode 100644 index 59f8874..0000000 --- a/core/src/main/scala/kafka/common/InvalidTopicException.scala +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -class InvalidTopicException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala deleted file mode 100644 index c4f9def..0000000 --- a/core/src/main/scala/kafka/common/NotEnoughReplicasAfterAppendException.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Number of insync replicas for the partition is lower than min.insync.replicas - * This exception is raised when the low ISR size is discovered *after* the message - * was already appended to the log. Producer retries will cause duplicates. - */ -class NotEnoughReplicasAfterAppendException(message: String) extends RuntimeException(message) { - def this() = this(null) -} diff --git a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala b/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala deleted file mode 100644 index bfbe0ee..0000000 --- a/core/src/main/scala/kafka/common/NotEnoughReplicasException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Message was rejected because number of insync replicas for the partition is lower than min.insync.replicas - */ -class NotEnoughReplicasException(message: String) extends RuntimeException(message) { - def this() = this(null) -} diff --git a/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala b/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala deleted file mode 100644 index b4558f8..0000000 --- a/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for partition on a broker that is NOT a leader for that partition - */ -class NotLeaderForPartitionException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala b/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala deleted file mode 100644 index 50edb27..0000000 --- a/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Indicates the client has specified offset metadata that exceeds the configured - * maximum size in bytes - */ -class OffsetMetadataTooLargeException(message: String) extends RuntimeException(message) { - def this() = this(null) -} - diff --git a/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala b/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala deleted file mode 100644 index 0a2514c..0000000 --- a/core/src/main/scala/kafka/common/OffsetOutOfRangeException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Indicates the client has requested a range no longer available on the server - */ -class OffsetOutOfRangeException(message: String) extends RuntimeException(message) { - def this() = this(null) -} - diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ad75978..d182e03 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -19,6 +19,7 @@ package kafka.common import util.matching.Regex import kafka.server.OffsetManager +import org.apache.kafka.common.errors.InvalidTopicException object Topic { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 846023b..83d680b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -27,6 +27,8 @@ import java.io.{IOException, File} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import java.text.NumberFormat +import org.apache.kafka.common.errors.OffsetOutOfRangeException + import scala.collection.JavaConversions import com.yammer.metrics.core.Gauge diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index dd602ee..9186620 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -20,7 +20,7 @@ package kafka.server import kafka.api.FetchResponsePartitionData import kafka.api.PartitionFetchInfo import kafka.common.UnknownTopicOrPartitionException -import kafka.common.NotLeaderForPartitionException +import org.apache.kafka.common.errors.NotLeaderForPartitionException import kafka.common.TopicAndPartition import scala.collection._ diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6ee7d88..e89d278 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,6 +17,7 @@ package kafka.server +import org.apache.kafka.common.errors.NotLeaderForPartitionException import org.apache.kafka.common.requests.JoinGroupResponse import org.apache.kafka.common.requests.HeartbeatResponse import org.apache.kafka.common.TopicPartition diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fb948b9..ac61d84 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -37,6 +37,8 @@ import scala.collection.Set import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.errors.NotLeaderForPartitionException /* * Result metadata of a log append operation on the log diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index e19b8b2..70cfc5d 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -22,6 +22,7 @@ import kafka.common._ import kafka.message._ import kafka.log._ import kafka.utils._ +import org.apache.kafka.common.errors.OffsetOutOfRangeException /** * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala index 0fb2588..4f6c58e 100644 --- a/core/src/test/scala/unit/kafka/common/TopicTest.scala +++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala @@ -19,7 +19,8 @@ package unit.kafka.common import junit.framework.Assert._ import collection.mutable.ArrayBuffer -import kafka.common.{Topic, InvalidTopicException} +import kafka.common.Topic +import org.apache.kafka.common.errors.InvalidTopicException import org.junit.Test class TopicTest { diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index aeb7a19..ec7fd78 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -22,13 +22,14 @@ import junit.framework.Assert._ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.producer.{KeyedMessage, Producer} +import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.admin.AdminUtils -import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} +import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException} import kafka.utils.{StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 90cd530..a847de9 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io._ import junit.framework.Assert._ +import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.junit.Test import org.scalatest.junit.JUnit3Suite import kafka.server.{BrokerState, OffsetCheckpoint} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c2dd8eb..9c05f8c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -20,10 +20,11 @@ package kafka.log import java.io._ import java.util.concurrent.atomic._ import junit.framework.Assert._ +import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.message._ -import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException} +import kafka.common.{MessageSizeTooLargeException,MessageSetSizeTooLargeException} import kafka.utils._ import kafka.server.KafkaConfig -- 1.8.4