From b8265e3d6be7cb0a939fb5f28a2b67efbaba301d Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 10:31:59 -0700 Subject: [PATCH 1/4] support requests and responses using Common api in core modules --- .../kafka/api/HeartbeatRequestAndHeader.scala | 45 ---------------------- .../kafka/api/HeartbeatResponseAndHeader.scala | 28 -------------- .../kafka/api/JoinGroupRequestAndHeader.scala | 45 ---------------------- .../kafka/api/JoinGroupResponseAndHeader.scala | 28 -------------- 4 files changed, 146 deletions(-) delete mode 100644 core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala deleted file mode 100644 index f168d9f..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest} -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object HeartbeatRequestAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = HeartbeatRequest.parse(buffer) - new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class HeartbeatRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: HeartbeatRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala deleted file mode 100644 index 9a71faa..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.HeartbeatResponse -import java.nio.ByteBuffer - -object HeartbeatResponseAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = { - val correlationId = buffer.getInt - val body = HeartbeatResponse.parse(buffer) - new HeartbeatResponseAndHeader(correlationId, body) - } -} - -case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { -} diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala deleted file mode 100644 index 3651e86..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests._ -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object JoinGroupRequestAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = JoinGroupRequest.parse(buffer) - new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class JoinGroupRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: JoinGroupRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala deleted file mode 100644 index d0f07e0..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.JoinGroupResponse -import java.nio.ByteBuffer - -object JoinGroupResponseAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = { - val correlationId = buffer.getInt - val body = JoinGroupResponse.parse(buffer) - new JoinGroupResponseAndHeader(correlationId, body) - } -} - -case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { -} -- 1.9.5 (Apple Git-50.3) From 4f0ab2a8ec833e92a0dbd2fab02eca817063ae66 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 10:32:33 -0700 Subject: [PATCH 2/4] support requests and responses using Common api in core modules (missing files) --- core/src/main/scala/kafka/api/RequestKeys.scala | 4 +-- .../kafka/network/BoundedByteBufferSend.scala | 8 +++++ .../main/scala/kafka/network/RequestChannel.scala | 15 +++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 42 +++++++++++++--------- .../api/RequestResponseSerializationTest.scala | 29 +-------------- 5 files changed, 49 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index c24c034..ef7a86e 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -46,9 +46,7 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), - JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom), - HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom) ) def nameForKey(key: Short): String = { diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index 55ecac2..b95b73b 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -21,6 +21,7 @@ import java.nio._ import java.nio.channels._ import kafka.utils._ import kafka.api.RequestOrResponse +import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { @@ -50,6 +51,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send buffer.rewind() } + def this(header: ResponseHeader, body: AbstractRequestResponse) = { + this(header.sizeOf + body.sizeOf) + header.writeTo(buffer) + body.writeTo(buffer) + buffer.rewind + } + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7b1db3d..811a97c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,9 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.kafka.common.protocol.ProtoUtils +import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.requests.RequestHeader import org.apache.log4j.Logger @@ -47,7 +50,17 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() - val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) + val requestObj = + if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) + RequestKeys.deserializerForKey(requestId)(buffer) + else + null + val (header, body) = if (requestObj == null) { + buffer.rewind + val localHeader = RequestHeader.parse(buffer) + (localHeader, ProtoUtils.requestSchema(localHeader.apiKey,localHeader.apiVersion).read(buffer).asInstanceOf[Struct]) + } else + (null, null) buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35af98f..1091c20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,8 +17,15 @@ package kafka.server -import org.apache.kafka.common.requests.JoinGroupResponse -import org.apache.kafka.common.requests.HeartbeatResponse +import kafka.api.ConsumerMetadataRequest +import kafka.api.ConsumerMetadataResponse +import kafka.api.FetchRequest +import kafka.api.FetchResponse +import kafka.api.OffsetCommitRequest +import kafka.api.OffsetCommitResponse +import kafka.api.OffsetFetchRequest +import kafka.api.OffsetFetchResponse +import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ @@ -452,40 +459,41 @@ class KafkaApis(val requestChannel: RequestChannel, def handleJoinGroupRequest(request: RequestChannel.Request) { import JavaConversions._ - val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] + val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) - val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) } // let the coordinator to handle join-group coordinator.consumerJoinGroup( - joinGroupRequest.body.groupId(), - joinGroupRequest.body.consumerId(), - joinGroupRequest.body.topics().toList, - joinGroupRequest.body.sessionTimeout(), - joinGroupRequest.body.strategy(), + joinGroupRequest.groupId(), + joinGroupRequest.consumerId(), + joinGroupRequest.topics().toList, + joinGroupRequest.sessionTimeout(), + joinGroupRequest.strategy(), sendResponseCallback) } def handleHeartbeatRequest(request: RequestChannel.Request) { - val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] + val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { - val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val response = new HeartbeatResponse(errorCode) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) } // let the coordinator to handle heartbeat coordinator.consumerHeartbeat( - heartbeatRequest.body.groupId(), - heartbeatRequest.body.consumerId(), - heartbeatRequest.body.groupGenerationId(), + heartbeatRequest.groupId(), + heartbeatRequest.consumerId(), + heartbeatRequest.groupGenerationId(), sendResponseCallback) } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fba852a..7233120 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -194,28 +194,6 @@ object SerializationTestUtils { def createConsumerMetadataResponse: ConsumerMetadataResponse = { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0) } - - def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { - val body = new HeartbeatRequest("group1", 1, "consumer1") - HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = { - val body = new HeartbeatResponse(0.asInstanceOf[Short]) - HeartbeatResponseAndHeader(1, body) - } - - def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1"); - JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) - JoinGroupResponseAndHeader(1, body) - } } class RequestResponseSerializationTest extends JUnitSuite { @@ -238,10 +216,6 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) - private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader - private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader - private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader - private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader @Test def testSerializationAndDeserialization() { @@ -253,8 +227,7 @@ class RequestResponseSerializationTest extends JUnitSuite { topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, - consumerMetadataResponseNoCoordinator, heartbeatRequest, - heartbeatResponse, joinGroupRequest, joinGroupResponse) + consumerMetadataResponseNoCoordinator) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes) -- 1.9.5 (Apple Git-50.3) From 2d696a0ba519411e2bf201af4a464230212e46f8 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 15:39:07 -0700 Subject: [PATCH 3/4] added error handling and factory method for requests --- .../common/requests/AbstractRequestResponse.java | 36 ++++++++++++++++++++++ .../kafka/common/requests/HeartbeatRequest.java | 6 ++++ .../kafka/common/requests/JoinGroupRequest.java | 6 ++++ .../main/scala/kafka/network/RequestChannel.scala | 16 ++++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++- 5 files changed, 65 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java index 37aff6c..6a3faf1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -42,6 +43,15 @@ public abstract class AbstractRequestResponse { struct.writeTo(buffer); } + /** + * Get an error response for a request + * Supplying a default implementation since response objects are not supposed to implement this + * Note: This MUST be implemented for request objects used by core module. + */ + public AbstractRequestResponse getErrorResponse(Throwable e) { + return null; + } + @Override public String toString() { return struct.toString(); @@ -63,4 +73,30 @@ public abstract class AbstractRequestResponse { AbstractRequestResponse other = (AbstractRequestResponse) obj; return struct.equals(other.struct); } + + public static AbstractRequestResponse getRequest(int requestId, ByteBuffer buffer) { + switch (ApiKeys.forId(requestId)) { + case PRODUCE: + return ProduceRequest.parse(buffer); + case FETCH: + return FetchRequest.parse(buffer); + case LIST_OFFSETS: + return ListOffsetRequest.parse(buffer); + case METADATA: + return MetadataRequest.parse(buffer); + case OFFSET_COMMIT: + return OffsetCommitRequest.parse(buffer); + case OFFSET_FETCH: + return OffsetFetchRequest.parse(buffer); + case CONSUMER_METADATA: + return ConsumerMetadataRequest.parse(buffer); + case JOIN_GROUP: + return JoinGroupRequest.parse(buffer); + case HEARTBEAT: + return HeartbeatRequest.parse(buffer); + default: + return null; + } + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 6943878..f774d16 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -62,4 +63,9 @@ public class HeartbeatRequest extends AbstractRequestResponse { public static HeartbeatRequest parse(ByteBuffer buffer) { return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer)); } + + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new HeartbeatResponse(Errors.forException(e).code()); + } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 1ebc188..2e47023 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -13,6 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -87,4 +88,9 @@ public class JoinGroupRequest extends AbstractRequestResponse { public static JoinGroupRequest parse(ByteBuffer buffer) { return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer)); } + + @Override + public AbstractRequestResponse getErrorResponse(Throwable e) { + return new JoinGroupResponse(Errors.forException(e).code()); + } } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 811a97c..e2be7bf 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,9 +26,9 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.protocol.ProtoUtils +import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils} import org.apache.kafka.common.protocol.types.Struct -import org.apache.kafka.common.requests.RequestHeader +import org.apache.kafka.common.requests.{ResponseHeader, AbstractRequestResponse, RequestHeader} import org.apache.log4j.Logger @@ -55,12 +55,16 @@ object RequestChannel extends Logging { RequestKeys.deserializerForKey(requestId)(buffer) else null - val (header, body) = if (requestObj == null) { + val header: RequestHeader = if (requestObj == null) { buffer.rewind - val localHeader = RequestHeader.parse(buffer) - (localHeader, ProtoUtils.requestSchema(localHeader.apiKey,localHeader.apiVersion).read(buffer).asInstanceOf[Struct]) + RequestHeader.parse(buffer) } else - (null, null) + null + val body: AbstractRequestResponse = if (requestObj == null) + AbstractRequestResponse.getRequest(header.apiKey, buffer) + else + null + buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1091c20..023dfc3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -81,7 +81,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } catch { case e: Throwable => - request.requestObj.handleError(e, requestChannel, request) + if ( request.requestObj != null) + request.requestObj.handleError(e, requestChannel, request) + else { + val response = request.body.getErrorResponse(e) + val respHeader = new ResponseHeader(request.header.correlationId) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + } error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds -- 1.9.5 (Apple Git-50.3) From 5b42b538eb46203f7fd308cb3d3f27dde98840b8 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 25 Mar 2015 14:01:19 -0700 Subject: [PATCH 4/4] KAFKA-2047; Move the stream creation into concurrent mirror maker threads; reviewed by Guozhang Wang --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 49 +++++++++-------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f3c4c8..ec07743 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -22,18 +22,18 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.{Collections, Properties} -import scala.collection.JavaConversions._ - import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, Logging, Utils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} + +import scala.collection.JavaConversions._ /** * The mirror maker has the following architecture: @@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else new Blacklist(options.valueOf(blacklistOpt)) - // create a (connector->stream) sequence - val connectorStream = (0 until numStreams) map { - i => { - var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null - try { - // Creating just on stream per each connector instance - stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) - require(stream.size == 1) - } catch { - case t: Throwable => - fatal("Unable to create stream - shutting down mirror maker.", t) - connectors(i).shutdown() - } - connectors(i) -> stream(0) - } - } - // Create mirror maker threads mirrorMakerThreads = (0 until numStreams) map ( i => - new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i) + new MirrorMakerThread(connectors(i), filterSpec, i) ) // Create and initialize message handler @@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) { - properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue)) + val propertyValue = properties.getProperty(propertyName) + properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue)) if (properties.getProperty(propertyName) != defaultValue) - info("Property %s is overridden to %s - data loss or message reordering is possible.") + info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue)) } class MirrorMakerThread(connector: ZookeeperConsumerConnector, - stream: KafkaStream[Array[Byte], Array[Byte]], + filterSpec: TopicFilter, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId private val shutdownLatch: CountDownLatch = new CountDownLatch(1) @@ -313,8 +297,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def run() { info("Starting mirror maker thread " + threadName) - val iter = stream.iterator() try { + // Creating one stream per each connector instance + val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) + require(streams.size == 1) + val stream = streams(0) + val iter = stream.iterator() + // TODO: Need to be changed after KAFKA-1660 is available. while (!exitingOnSendFailure && !shuttingDown) { try { @@ -333,10 +322,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } catch { case t: Throwable => - fatal("Producer thread failure due to ", t) + fatal("Mirror maker thread failure due to ", t) } finally { shutdownLatch.countDown() - info("Producer thread stopped") + info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker if (!isShuttingdown.get()) { fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.") @@ -360,7 +349,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } catch { case ie: InterruptedException => - warn("Interrupt during shutdown of ProducerThread") + warn("Interrupt during shutdown of the mirror maker thread") } } @@ -370,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Mirror maker thread shutdown complete") } catch { case ie: InterruptedException => - warn("Shutdown of the producer thread interrupted") + warn("Shutdown of the mirror maker thread interrupted") } } } -- 1.9.5 (Apple Git-50.3)