From b8265e3d6be7cb0a939fb5f28a2b67efbaba301d Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 10:31:59 -0700 Subject: [PATCH 1/5] support requests and responses using Common api in core modules --- .../kafka/api/HeartbeatRequestAndHeader.scala | 45 ---------------------- .../kafka/api/HeartbeatResponseAndHeader.scala | 28 -------------- .../kafka/api/JoinGroupRequestAndHeader.scala | 45 ---------------------- .../kafka/api/JoinGroupResponseAndHeader.scala | 28 -------------- 4 files changed, 146 deletions(-) delete mode 100644 core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala deleted file mode 100644 index f168d9f..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest} -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object HeartbeatRequestAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = HeartbeatRequest.parse(buffer) - new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class HeartbeatRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: HeartbeatRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala deleted file mode 100644 index 9a71faa..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.HeartbeatResponse -import java.nio.ByteBuffer - -object HeartbeatResponseAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = { - val correlationId = buffer.getInt - val body = HeartbeatResponse.parse(buffer) - new HeartbeatResponseAndHeader(correlationId, body) - } -} - -case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { -} diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala deleted file mode 100644 index 3651e86..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests._ -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object JoinGroupRequestAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = JoinGroupRequest.parse(buffer) - new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class JoinGroupRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: JoinGroupRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala deleted file mode 100644 index d0f07e0..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.JoinGroupResponse -import java.nio.ByteBuffer - -object JoinGroupResponseAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = { - val correlationId = buffer.getInt - val body = JoinGroupResponse.parse(buffer) - new JoinGroupResponseAndHeader(correlationId, body) - } -} - -case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { -} -- 1.9.5 (Apple Git-50.3) From 4f0ab2a8ec833e92a0dbd2fab02eca817063ae66 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 10:32:33 -0700 Subject: [PATCH 2/5] support requests and responses using Common api in core modules (missing files) --- core/src/main/scala/kafka/api/RequestKeys.scala | 4 +-- .../kafka/network/BoundedByteBufferSend.scala | 8 +++++ .../main/scala/kafka/network/RequestChannel.scala | 15 +++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 42 +++++++++++++--------- .../api/RequestResponseSerializationTest.scala | 29 +-------------- 5 files changed, 49 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index c24c034..ef7a86e 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -46,9 +46,7 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), - JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom), - HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom) ) def nameForKey(key: Short): String = { diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index 55ecac2..b95b73b 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -21,6 +21,7 @@ import java.nio._ import java.nio.channels._ import kafka.utils._ import kafka.api.RequestOrResponse +import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { @@ -50,6 +51,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send buffer.rewind() } + def this(header: ResponseHeader, body: AbstractRequestResponse) = { + this(header.sizeOf + body.sizeOf) + header.writeTo(buffer) + body.writeTo(buffer) + buffer.rewind + } + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7b1db3d..811a97c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,9 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.kafka.common.protocol.ProtoUtils +import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.requests.RequestHeader import org.apache.log4j.Logger @@ -47,7 +50,17 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() - val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) + val requestObj = + if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) + RequestKeys.deserializerForKey(requestId)(buffer) + else + null + val (header, body) = if (requestObj == null) { + buffer.rewind + val localHeader = RequestHeader.parse(buffer) + (localHeader, ProtoUtils.requestSchema(localHeader.apiKey,localHeader.apiVersion).read(buffer).asInstanceOf[Struct]) + } else + (null, null) buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35af98f..1091c20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,8 +17,15 @@ package kafka.server -import org.apache.kafka.common.requests.JoinGroupResponse -import org.apache.kafka.common.requests.HeartbeatResponse +import kafka.api.ConsumerMetadataRequest +import kafka.api.ConsumerMetadataResponse +import kafka.api.FetchRequest +import kafka.api.FetchResponse +import kafka.api.OffsetCommitRequest +import kafka.api.OffsetCommitResponse +import kafka.api.OffsetFetchRequest +import kafka.api.OffsetFetchResponse +import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ @@ -452,40 +459,41 @@ class KafkaApis(val requestChannel: RequestChannel, def handleJoinGroupRequest(request: RequestChannel.Request) { import JavaConversions._ - val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] + val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) - val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) } // let the coordinator to handle join-group coordinator.consumerJoinGroup( - joinGroupRequest.body.groupId(), - joinGroupRequest.body.consumerId(), - joinGroupRequest.body.topics().toList, - joinGroupRequest.body.sessionTimeout(), - joinGroupRequest.body.strategy(), + joinGroupRequest.groupId(), + joinGroupRequest.consumerId(), + joinGroupRequest.topics().toList, + joinGroupRequest.sessionTimeout(), + joinGroupRequest.strategy(), sendResponseCallback) } def handleHeartbeatRequest(request: RequestChannel.Request) { - val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] + val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { - val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val response = new HeartbeatResponse(errorCode) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) } // let the coordinator to handle heartbeat coordinator.consumerHeartbeat( - heartbeatRequest.body.groupId(), - heartbeatRequest.body.consumerId(), - heartbeatRequest.body.groupGenerationId(), + heartbeatRequest.groupId(), + heartbeatRequest.consumerId(), + heartbeatRequest.groupGenerationId(), sendResponseCallback) } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fba852a..7233120 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -194,28 +194,6 @@ object SerializationTestUtils { def createConsumerMetadataResponse: ConsumerMetadataResponse = { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0) } - - def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { - val body = new HeartbeatRequest("group1", 1, "consumer1") - HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = { - val body = new HeartbeatResponse(0.asInstanceOf[Short]) - HeartbeatResponseAndHeader(1, body) - } - - def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1"); - JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) - JoinGroupResponseAndHeader(1, body) - } } class RequestResponseSerializationTest extends JUnitSuite { @@ -238,10 +216,6 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) - private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader - private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader - private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader - private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader @Test def testSerializationAndDeserialization() { @@ -253,8 +227,7 @@ class RequestResponseSerializationTest extends JUnitSuite { topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, - consumerMetadataResponseNoCoordinator, heartbeatRequest, - heartbeatResponse, joinGroupRequest, joinGroupResponse) + consumerMetadataResponseNoCoordinator) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes) -- 1.9.5 (Apple Git-50.3) From 2d696a0ba519411e2bf201af4a464230212e46f8 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 15:39:07 -0700 Subject: [PATCH 3/5] added error handling and factory method for requests --- .../common/requests/AbstractRequestResponse.java | 36 ++++++++++++++++++++++ .../kafka/common/requests/HeartbeatRequest.java | 6 ++++ .../kafka/common/requests/JoinGroupRequest.java | 6 ++++ .../main/scala/kafka/network/RequestChannel.scala | 16 ++++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++- 5 files changed, 65 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java index 37aff6c..6a3faf1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -42,6 +43,15 @@ public abstract class AbstractRequestResponse { struct.writeTo(buffer); } + /** + * Get an error response for a request + * Supplying a default implementation since response objects are not supposed to implement this + * Note: This MUST be implemented for request objects used by core module. + */ + public AbstractRequestResponse getErrorResponse(Throwable e) { + return null; + } + @Override public String toString() { return struct.toString(); @@ -63,4 +73,30 @@ public abstract class AbstractRequestResponse { AbstractRequestResponse other = (AbstractRequestResponse) obj; return struct.equals(other.struct); } + + public static AbstractRequestResponse getRequest(int requestId, ByteBuffer buffer) { + switch (ApiKeys.forId(requestId)) { + case PRODUCE: + return ProduceRequest.parse(buffer); + case FETCH: + return FetchRequest.parse(buffer); + case LIST_OFFSETS: + return ListOffsetRequest.parse(buffer); + case METADATA: + return MetadataRequest.parse(buffer); + case OFFSET_COMMIT: + return OffsetCommitRequest.parse(buffer); + case OFFSET_FETCH: + return OffsetFetchRequest.parse(buffer); + case CONSUMER_METADATA: + return ConsumerMetadataRequest.parse(buffer); + case JOIN_GROUP: + return JoinGroupRequest.parse(buffer); + case HEARTBEAT: + return HeartbeatRequest.parse(buffer); + default: + return null; + } + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 6943878..f774d16 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -62,4 +63,9 @@ public class HeartbeatRequest extends AbstractRequestResponse { public static HeartbeatRequest parse(ByteBuffer buffer) { return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer)); } + + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new HeartbeatResponse(Errors.forException(e).code()); + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 1ebc188..2e47023 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -87,4 +88,9 @@ public class JoinGroupRequest extends AbstractRequestResponse { public static JoinGroupRequest parse(ByteBuffer buffer) { return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer)); } + + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new JoinGroupResponse(Errors.forException(e).code()); + } } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 811a97c..e2be7bf 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,9 +26,9 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.protocol.ProtoUtils +import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils} import org.apache.kafka.common.protocol.types.Struct -import org.apache.kafka.common.requests.RequestHeader +import org.apache.kafka.common.requests.{ResponseHeader, AbstractRequestResponse, RequestHeader} import org.apache.log4j.Logger @@ -55,12 +55,16 @@ object RequestChannel extends Logging { RequestKeys.deserializerForKey(requestId)(buffer) else null - val (header, body) = if (requestObj == null) { + val header: RequestHeader = if (requestObj == null) { buffer.rewind - val localHeader = RequestHeader.parse(buffer) - (localHeader, ProtoUtils.requestSchema(localHeader.apiKey,localHeader.apiVersion).read(buffer).asInstanceOf[Struct]) + RequestHeader.parse(buffer) } else - (null, null) + null + val body: AbstractRequestResponse = if (requestObj == null) + AbstractRequestResponse.getRequest(header.apiKey, buffer) + else + null + buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1091c20..023dfc3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -81,7 +81,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } catch { case e: Throwable => - request.requestObj.handleError(e, requestChannel, request) + if ( request.requestObj != null) + request.requestObj.handleError(e, requestChannel, request) + else { + val response = request.body.getErrorResponse(e) + val respHeader = new ResponseHeader(request.header.correlationId) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + } error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds -- 1.9.5 (Apple Git-50.3) From 5b42b538eb46203f7fd308cb3d3f27dde98840b8 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 25 Mar 2015 14:01:19 -0700 Subject: [PATCH 4/5] KAFKA-2047; Move the stream creation into concurrent mirror maker threads; reviewed by Guozhang Wang --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 49 +++++++++-------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f3c4c8..ec07743 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -22,18 +22,18 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.{Collections, Properties} -import scala.collection.JavaConversions._ - import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, Logging, Utils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} + +import scala.collection.JavaConversions._ /** * The mirror maker has the following architecture: @@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else new Blacklist(options.valueOf(blacklistOpt)) - // create a (connector->stream) sequence - val connectorStream = (0 until numStreams) map { - i => { - var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null - try { - // Creating just on stream per each connector instance - stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) - require(stream.size == 1) - } catch { - case t: Throwable => - fatal("Unable to create stream - shutting down mirror maker.", t) - connectors(i).shutdown() - } - connectors(i) -> stream(0) - } - } - // Create mirror maker threads mirrorMakerThreads = (0 until numStreams) map ( i => - new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i) + new MirrorMakerThread(connectors(i), filterSpec, i) ) // Create and initialize message handler @@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) { - properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue)) + val propertyValue = properties.getProperty(propertyName) + properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue)) if (properties.getProperty(propertyName) != defaultValue) - info("Property %s is overridden to %s - data loss or message reordering is possible.") + info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue)) } class MirrorMakerThread(connector: ZookeeperConsumerConnector, - stream: KafkaStream[Array[Byte], Array[Byte]], + filterSpec: TopicFilter, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId private val shutdownLatch: CountDownLatch = new CountDownLatch(1) @@ -313,8 +297,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def run() { info("Starting mirror maker thread " + threadName) - val iter = stream.iterator() try { + // Creating one stream per each connector instance + val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) + require(streams.size == 1) + val stream = streams(0) + val iter = stream.iterator() + // TODO: Need to be changed after KAFKA-1660 is available. while (!exitingOnSendFailure && !shuttingDown) { try { @@ -333,10 +322,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } catch { case t: Throwable => - fatal("Producer thread failure due to ", t) + fatal("Mirror maker thread failure due to ", t) } finally { shutdownLatch.countDown() - info("Producer thread stopped") + info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker if (!isShuttingdown.get()) { fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.") @@ -360,7 +349,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } catch { case ie: InterruptedException => - warn("Interrupt during shutdown of ProducerThread") + warn("Interrupt during shutdown of the mirror maker thread") } } @@ -370,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Mirror maker thread shutdown complete") } catch { case ie: InterruptedException => - warn("Shutdown of the producer thread interrupted") + warn("Shutdown of the mirror maker thread interrupted") } } } -- 1.9.5 (Apple Git-50.3) From e2250326ceb770f68b29d3c02d8a16c4c14a7ff7 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 25 Mar 2015 16:48:46 -0700 Subject: [PATCH 5/5] made getErrorResponse required for requests by adding another abstract class --- .../org/apache/kafka/common/PartitionInfo.java | 2 + .../kafka/common/requests/AbstractRequest.java | 62 ++++++++++++++++++++++ .../common/requests/AbstractRequestResponse.java | 36 ------------- .../common/requests/ConsumerMetadataRequest.java | 8 ++- .../apache/kafka/common/requests/FetchRequest.java | 15 +++++- .../kafka/common/requests/HeartbeatRequest.java | 2 +- .../kafka/common/requests/JoinGroupRequest.java | 2 +- .../kafka/common/requests/ListOffsetRequest.java | 15 +++++- .../kafka/common/requests/MetadataRequest.java | 15 +++++- .../kafka/common/requests/OffsetCommitRequest.java | 12 ++++- .../kafka/common/requests/OffsetFetchRequest.java | 17 +++++- .../kafka/common/requests/OffsetFetchResponse.java | 3 ++ .../kafka/common/requests/ProduceRequest.java | 19 ++++++- .../kafka/common/requests/ProduceResponse.java | 2 + .../main/scala/kafka/network/RequestChannel.scala | 32 +++++------ core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++- 16 files changed, 188 insertions(+), 62 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java index 321da8a..777fb83 100644 --- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java @@ -23,6 +23,8 @@ public class PartitionInfo { private final Node[] replicas; private final Node[] inSyncReplicas; + public static final int INVALID_PARTITION = -1; + public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) { this.topic = topic; this.partition = partition; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java new file mode 100644 index 0000000..5e5308e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public abstract class AbstractRequest extends AbstractRequestResponse { + + public AbstractRequest(Struct struct) { + super(struct); + } + + /** + * Get an error response for a request + */ + public abstract AbstractRequestResponse getErrorResponse(Throwable e); + + /** + * Factory method for getting a request object based on ApiKey ID and a buffer + */ + public static AbstractRequest getRequest(int requestId, ByteBuffer buffer) { + switch (ApiKeys.forId(requestId)) { + case PRODUCE: + return ProduceRequest.parse(buffer); + case FETCH: + return FetchRequest.parse(buffer); + case LIST_OFFSETS: + return ListOffsetRequest.parse(buffer); + case METADATA: + return MetadataRequest.parse(buffer); + case OFFSET_COMMIT: + return OffsetCommitRequest.parse(buffer); + case OFFSET_FETCH: + return OffsetFetchRequest.parse(buffer); + case CONSUMER_METADATA: + return ConsumerMetadataRequest.parse(buffer); + case JOIN_GROUP: + return JoinGroupRequest.parse(buffer); + case HEARTBEAT: + return HeartbeatRequest.parse(buffer); + default: + return null; + } + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java index 6a3faf1..37aff6c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java @@ -12,7 +12,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -43,15 +42,6 @@ public abstract class AbstractRequestResponse { struct.writeTo(buffer); } - /** - * Get an error response for a request - * Supplying a default implementation since response objects are not supposed to implement this - * Note: This MUST be implemented for request objects used by core module. - */ - public AbstractRequestResponse getErrorResponse(Throwable e) { - return null; - } - @Override public String toString() { return struct.toString(); @@ -73,30 +63,4 @@ public abstract class AbstractRequestResponse { AbstractRequestResponse other = (AbstractRequestResponse) obj; return struct.equals(other.struct); } - - public static AbstractRequestResponse getRequest(int requestId, ByteBuffer buffer) { - switch (ApiKeys.forId(requestId)) { - case PRODUCE: - return ProduceRequest.parse(buffer); - case FETCH: - return FetchRequest.parse(buffer); - case LIST_OFFSETS: - return ListOffsetRequest.parse(buffer); - case METADATA: - return MetadataRequest.parse(buffer); - case OFFSET_COMMIT: - return OffsetCommitRequest.parse(buffer); - case OFFSET_FETCH: - return OffsetFetchRequest.parse(buffer); - case CONSUMER_METADATA: - return ConsumerMetadataRequest.parse(buffer); - case JOIN_GROUP: - return JoinGroupRequest.parse(buffer); - case HEARTBEAT: - return HeartbeatRequest.parse(buffer); - default: - return null; - } - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java index 1651e75..c539ddd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -13,13 +13,14 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -public class ConsumerMetadataRequest extends AbstractRequestResponse { +public class ConsumerMetadataRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -38,6 +39,11 @@ public class ConsumerMetadataRequest extends AbstractRequestResponse { groupId = struct.getString(GROUP_ID_KEY_NAME); } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), null); + } + public String groupId() { return groupId; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 721e7d3..5a3c9e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -20,12 +20,13 @@ import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; -public class FetchRequest extends AbstractRequestResponse { +public class FetchRequest extends AbstractRequest { public static final int CONSUMER_REPLICA_ID = -1; private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id); @@ -118,6 +119,18 @@ public class FetchRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map responseData = new HashMap(); + + for (Map.Entry entry: fetchData.entrySet()) { + FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(), -1, ByteBuffer.allocate(0)); + responseData.put(entry.getKey(), partitionResponse); + } + + return new FetchResponse(responseData); + } + public int replicaId() { return replicaId; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index f774d16..51d081f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -public class HeartbeatRequest extends AbstractRequestResponse { +public class HeartbeatRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 2e47023..6795682 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class JoinGroupRequest extends AbstractRequestResponse { +public class JoinGroupRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id); private static final String GROUP_ID_KEY_NAME = "group_id"; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index e5dc92e..6eb89a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -29,7 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ListOffsetRequest extends AbstractRequestResponse { +public class ListOffsetRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id); private static final String REPLICA_ID_KEY_NAME = "replica_id"; @@ -105,6 +106,18 @@ public class ListOffsetRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map responseData = new HashMap(); + + for (Map.Entry entry: offsetData.entrySet()) { + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), null); + responseData.put(entry.getKey(), partitionResponse); + } + + return new ListOffsetResponse(responseData); + } + public int replicaId() { return replicaId; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 5d5f52c..abdbc1f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -16,12 +16,15 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -public class MetadataRequest extends AbstractRequestResponse { +public class MetadataRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); private static final String TOPICS_KEY_NAME = "topics"; @@ -43,6 +46,16 @@ public class MetadataRequest extends AbstractRequestResponse { } } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + List partitionInfos = new ArrayList(); + for (String topic: topics) { + partitionInfos.add(new PartitionInfo(topic, PartitionInfo.INVALID_PARTITION, null, null, null)); + } + Cluster responseCluster = new Cluster(new ArrayList(), partitionInfos); + return new MetadataResponse(responseCluster); + } + public List topics() { return topics; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 94e9d37..0987547 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -28,7 +29,7 @@ import org.apache.kafka.common.utils.CollectionUtils; /** * This wrapper supports both v0 and v1 of OffsetCommitRequest. */ -public class OffsetCommitRequest extends AbstractRequestResponse { +public class OffsetCommitRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -154,6 +155,15 @@ public class OffsetCommitRequest extends AbstractRequestResponse { consumerId = DEFAULT_CONSUMER_ID; } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map responseData = new HashMap(); + for (Map.Entry entry: offsetData.entrySet()) { + responseData.put(entry.getKey(), Errors.forException(e).code()); + } + return new OffsetCommitResponse(responseData); + } + public String groupId() { return groupId; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 16c807c..deec1fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -21,13 +22,14 @@ import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; /** * This wrapper supports both v0 and v1 of OffsetFetchRequest. */ -public class OffsetFetchRequest extends AbstractRequestResponse { +public class OffsetFetchRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id); private static final String GROUP_ID_KEY_NAME = "group_id"; @@ -85,6 +87,19 @@ public class OffsetFetchRequest extends AbstractRequestResponse { groupId = struct.getString(GROUP_ID_KEY_NAME); } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + Map responseData = new HashMap(); + + for (TopicPartition partition: partitions) { + responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, + OffsetFetchResponse.NO_METADATA, + Errors.forException(e).code())); + } + + return new OffsetFetchResponse(responseData); + } + public String groupId() { return groupId; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index f10c246..512a0ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -41,6 +41,9 @@ public class OffsetFetchResponse extends AbstractRequestResponse { private static final String METADATA_KEY_NAME = "metadata"; private static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final long INVALID_OFFSET = -1L; + public static final String NO_METADATA = ""; + /** * Possible error code: * diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 995f89f..fabeae3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -26,7 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ProduceRequest extends AbstractRequestResponse { +public class ProduceRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id); private static final String ACKS_KEY_NAME = "acks"; @@ -88,6 +89,22 @@ public class ProduceRequest extends AbstractRequestResponse { timeout = struct.getInt(TIMEOUT_KEY_NAME); } + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + + /* In case the producer doesn't actually want any response */ + if (acks == 0) + return null; + + Map responseMap = new HashMap(); + + for (Map.Entry entry: partitionRecords.entrySet()) { + responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET)); + } + + return new ProduceResponse(responseMap); + } + public short acks() { return acks; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 4b67f70..37ec0b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -38,6 +38,8 @@ public class ProduceResponse extends AbstractRequestResponse { private static final String PARTITION_KEY_NAME = "partition"; private static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final long INVALID_OFFSET = -1L; + /** * Possible error code: * diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index e2be7bf..bc73540 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,9 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils} -import org.apache.kafka.common.protocol.types.Struct -import org.apache.kafka.common.requests.{ResponseHeader, AbstractRequestResponse, RequestHeader} +import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger @@ -51,19 +49,21 @@ object RequestChannel extends Logging { @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() val requestObj = - if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) - RequestKeys.deserializerForKey(requestId)(buffer) - else - null - val header: RequestHeader = if (requestObj == null) { - buffer.rewind - RequestHeader.parse(buffer) - } else - null - val body: AbstractRequestResponse = if (requestObj == null) - AbstractRequestResponse.getRequest(header.apiKey, buffer) - else - null + if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) + RequestKeys.deserializerForKey(requestId)(buffer) + else + null + val header: RequestHeader = + if (requestObj == null) { + buffer.rewind + RequestHeader.parse(buffer) + } else + null + val body: AbstractRequest = + if (requestObj == null) + AbstractRequest.getRequest(header.apiKey, buffer) + else + null buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 023dfc3..c7ab526 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -86,7 +86,13 @@ class KafkaApis(val requestChannel: RequestChannel, else { val response = request.body.getErrorResponse(e) val respHeader = new ResponseHeader(request.header.correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + + /* If request doesn't have a default error response, we just close the connection. + For example, when produce request has acks set to 0 */ + if (response == null) + requestChannel.closeConnection(request.processor, request) + else + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) } error("error when handling request %s".format(request.requestObj), e) } finally -- 1.9.5 (Apple Git-50.3)