From b8265e3d6be7cb0a939fb5f28a2b67efbaba301d Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 10:31:59 -0700 Subject: [PATCH 1/2] support requests and responses using Common api in core modules --- .../kafka/api/HeartbeatRequestAndHeader.scala | 45 ---------------------- .../kafka/api/HeartbeatResponseAndHeader.scala | 28 -------------- .../kafka/api/JoinGroupRequestAndHeader.scala | 45 ---------------------- .../kafka/api/JoinGroupResponseAndHeader.scala | 28 -------------- 4 files changed, 146 deletions(-) delete mode 100644 core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala delete mode 100644 core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala deleted file mode 100644 index f168d9f..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest} -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object HeartbeatRequestAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = HeartbeatRequest.parse(buffer) - new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class HeartbeatRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: HeartbeatRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala deleted file mode 100644 index 9a71faa..0000000 --- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.HeartbeatResponse -import java.nio.ByteBuffer - -object HeartbeatResponseAndHeader { - def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = { - val correlationId = buffer.getInt - val body = HeartbeatResponse.parse(buffer) - new HeartbeatResponseAndHeader(correlationId, body) - } -} - -case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { -} diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala deleted file mode 100644 index 3651e86..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import org.apache.kafka.common.requests._ -import kafka.api.ApiUtils._ -import kafka.network.RequestChannel.Response -import scala.Some - -object JoinGroupRequestAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = readShortString(buffer) - val body = JoinGroupRequest.parse(buffer) - new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body) - } -} - -case class JoinGroupRequestAndHeader(override val versionId: Short, - override val correlationId: Int, - override val clientId: String, - override val body: JoinGroupRequest) - extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) - } -} diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala deleted file mode 100644 index d0f07e0..0000000 --- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import org.apache.kafka.common.requests.JoinGroupResponse -import java.nio.ByteBuffer - -object JoinGroupResponseAndHeader { - def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = { - val correlationId = buffer.getInt - val body = JoinGroupResponse.parse(buffer) - new JoinGroupResponseAndHeader(correlationId, body) - } -} - -case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse) - extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { -} -- 1.9.5 (Apple Git-50.3) From 4f0ab2a8ec833e92a0dbd2fab02eca817063ae66 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 24 Mar 2015 10:32:33 -0700 Subject: [PATCH 2/2] support requests and responses using Common api in core modules (missing files) --- core/src/main/scala/kafka/api/RequestKeys.scala | 4 +-- .../kafka/network/BoundedByteBufferSend.scala | 8 +++++ .../main/scala/kafka/network/RequestChannel.scala | 15 +++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 42 +++++++++++++--------- .../api/RequestResponseSerializationTest.scala | 29 +-------------- 5 files changed, 49 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index c24c034..ef7a86e 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -46,9 +46,7 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom), - JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom), - HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom) + ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom) ) def nameForKey(key: Short): String = { diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index 55ecac2..b95b73b 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -21,6 +21,7 @@ import java.nio._ import java.nio.channels._ import kafka.utils._ import kafka.api.RequestOrResponse +import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { @@ -50,6 +51,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send buffer.rewind() } + def this(header: ResponseHeader, body: AbstractRequestResponse) = { + this(header.sizeOf + body.sizeOf) + header.writeTo(buffer) + body.writeTo(buffer) + buffer.rewind + } + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7b1db3d..811a97c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,9 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.kafka.common.protocol.ProtoUtils +import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.requests.RequestHeader import org.apache.log4j.Logger @@ -47,7 +50,17 @@ object RequestChannel extends Logging { @volatile var responseCompleteTimeMs = -1L @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() - val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) + val requestObj = + if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId)) + RequestKeys.deserializerForKey(requestId)(buffer) + else + null + val (header, body) = if (requestObj == null) { + buffer.rewind + val localHeader = RequestHeader.parse(buffer) + (localHeader, ProtoUtils.requestSchema(localHeader.apiKey,localHeader.apiVersion).read(buffer).asInstanceOf[Struct]) + } else + (null, null) buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35af98f..1091c20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,8 +17,15 @@ package kafka.server -import org.apache.kafka.common.requests.JoinGroupResponse -import org.apache.kafka.common.requests.HeartbeatResponse +import kafka.api.ConsumerMetadataRequest +import kafka.api.ConsumerMetadataResponse +import kafka.api.FetchRequest +import kafka.api.FetchResponse +import kafka.api.OffsetCommitRequest +import kafka.api.OffsetCommitResponse +import kafka.api.OffsetFetchRequest +import kafka.api.OffsetFetchResponse +import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ @@ -452,40 +459,41 @@ class KafkaApis(val requestChannel: RequestChannel, def handleJoinGroupRequest(request: RequestChannel.Request) { import JavaConversions._ - val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] + val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.body.consumerId, partitionList) - val response = new JoinGroupResponseAndHeader(joinGroupRequest.correlationId, responseBody) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) } // let the coordinator to handle join-group coordinator.consumerJoinGroup( - joinGroupRequest.body.groupId(), - joinGroupRequest.body.consumerId(), - joinGroupRequest.body.topics().toList, - joinGroupRequest.body.sessionTimeout(), - joinGroupRequest.body.strategy(), + joinGroupRequest.groupId(), + joinGroupRequest.consumerId(), + joinGroupRequest.topics().toList, + joinGroupRequest.sessionTimeout(), + joinGroupRequest.strategy(), sendResponseCallback) } def handleHeartbeatRequest(request: RequestChannel.Request) { - val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] + val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] + val respHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { - val response = new HeartbeatResponseAndHeader(heartbeatRequest.correlationId, new HeartbeatResponse(errorCode)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + val response = new HeartbeatResponse(errorCode) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) } // let the coordinator to handle heartbeat coordinator.consumerHeartbeat( - heartbeatRequest.body.groupId(), - heartbeatRequest.body.consumerId(), - heartbeatRequest.body.groupGenerationId(), + heartbeatRequest.groupId(), + heartbeatRequest.consumerId(), + heartbeatRequest.groupGenerationId(), sendResponseCallback) } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fba852a..7233120 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -194,28 +194,6 @@ object SerializationTestUtils { def createConsumerMetadataResponse: ConsumerMetadataResponse = { ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0) } - - def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { - val body = new HeartbeatRequest("group1", 1, "consumer1") - HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = { - val body = new HeartbeatResponse(0.asInstanceOf[Short]) - HeartbeatResponseAndHeader(1, body) - } - - def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1"); - JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body) - } - - def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = { - import scala.collection.JavaConversions._ - val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) - JoinGroupResponseAndHeader(1, body) - } } class RequestResponseSerializationTest extends JUnitSuite { @@ -238,10 +216,6 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) - private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader - private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader - private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader - private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader @Test def testSerializationAndDeserialization() { @@ -253,8 +227,7 @@ class RequestResponseSerializationTest extends JUnitSuite { topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, - consumerMetadataResponseNoCoordinator, heartbeatRequest, - heartbeatResponse, joinGroupRequest, joinGroupResponse) + consumerMetadataResponseNoCoordinator) requestsAndResponses.foreach { original => val buffer = ByteBuffer.allocate(original.sizeInBytes) -- 1.9.5 (Apple Git-50.3)