diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala new file mode 100644 index 0000000..f40e19f --- /dev/null +++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import org.apache.kafka.common.requests.AbstractRequestResponse +import kafka.api.ApiUtils._ + +private[kafka] abstract class GenericRequestAndHeader(val versionId: Short, + val correlationId: Int, + val clientId: String, + val body: AbstractRequestResponse, + val name: String, + override val requestId: Option[Short] = None) + extends RequestOrResponse(requestId) { + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putInt(correlationId) + writeShortString(buffer, clientId) + body.writeTo(buffer) + } + + def sizeInBytes(): Int = { + 2 /* version id */ + + 4 /* correlation id */ + + (2 + clientId.length) /* client id */ + + body.sizeOf(); + } + + override def toString(): String = { + describe(true) + } + + override def describe(details: Boolean): String = { + val strBuffer = new StringBuilder + strBuffer.append("Name: " + name) + strBuffer.append("; Version: " + versionId) + strBuffer.append("; CorrelationId: " + correlationId) + strBuffer.append("; ClientId: " + clientId) + strBuffer.append("; Body: " + body.toString) + strBuffer.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala deleted file mode 100644 index fb022e8..0000000 --- a/core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import org.apache.kafka.common.requests.AbstractRequestResponse - -private[kafka] abstract class GenericRequestOrResponseAndHeader(val header: AbstractRequestResponse, - val body: AbstractRequestResponse, - val name: String, - override val requestId: Option[Short] = None) - extends RequestOrResponse(requestId) { - - def writeTo(buffer: ByteBuffer) { - header.writeTo(buffer) - body.writeTo(buffer) - } - - def sizeInBytes(): Int = { - header.sizeOf() + body.sizeOf(); - } - - override def toString(): String = { - describe(true) - } - - override def describe(details: Boolean): String = { - val strBuffer = new StringBuilder - strBuffer.append("Name: " + name) - strBuffer.append("; header: " + header.toString) - strBuffer.append("; body: " + body.toString) - strBuffer.toString() - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala new file mode 100644 index 0000000..a4879e2 --- /dev/null +++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import org.apache.kafka.common.requests.AbstractRequestResponse + +private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int, + val body: AbstractRequestResponse, + val name: String, + override val requestId: Option[Short] = None) + extends RequestOrResponse(requestId) { + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + body.writeTo(buffer) + } + + def sizeInBytes(): Int = { + 4 /* correlation id */ + + body.sizeOf(); + } + + override def toString(): String = { + describe(true) + } + + override def describe(details: Boolean): String = { + val strBuffer = new StringBuilder + strBuffer.append("Name: " + name) + strBuffer.append("; CorrelationId: " + correlationId) + strBuffer.append("; Body: " + body.toString) + strBuffer.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala index 932418b..f168d9f 100644 --- a/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala +++ b/core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala @@ -16,24 +16,30 @@ package kafka.api import java.nio.ByteBuffer import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import org.apache.kafka.common.requests.{HeartbeatResponse, HeartbeatRequest} +import kafka.api.ApiUtils._ import kafka.network.RequestChannel.Response -import org.apache.kafka.common.requests.{HeartbeatResponse, ResponseHeader, HeartbeatRequest, RequestHeader} +import scala.Some object HeartbeatRequestAndHeader { def readFrom(buffer: ByteBuffer): HeartbeatRequestAndHeader = { - val header = RequestHeader.parse(buffer) + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) val body = HeartbeatRequest.parse(buffer) - new HeartbeatRequestAndHeader(header, body) + new HeartbeatRequestAndHeader(versionId, correlationId, clientId, body) } } -case class HeartbeatRequestAndHeader(override val header: RequestHeader, override val body: HeartbeatRequest) - extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { +case class HeartbeatRequestAndHeader(override val versionId: Short, + override val correlationId: Int, + override val clientId: String, + override val body: HeartbeatRequest) + extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), Some(RequestKeys.HeartbeatKey)) { override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseHeader = new ResponseHeader(header.correlationId) val errorResponseBody = new HeartbeatResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(errorResponseHeader, errorResponseBody) + val errorHeartBeatResponseAndHeader = new HeartbeatResponseAndHeader(correlationId, errorResponseBody) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) } } diff --git a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala index 556f38d..9a71faa 100644 --- a/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala +++ b/core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala @@ -12,17 +12,17 @@ */ package kafka.api -import org.apache.kafka.common.requests.{ResponseHeader, HeartbeatResponse} +import org.apache.kafka.common.requests.HeartbeatResponse import java.nio.ByteBuffer object HeartbeatResponseAndHeader { def readFrom(buffer: ByteBuffer): HeartbeatResponseAndHeader = { - val header = ResponseHeader.parse(buffer) + val correlationId = buffer.getInt val body = HeartbeatResponse.parse(buffer) - new HeartbeatResponseAndHeader(header, body) + new HeartbeatResponseAndHeader(correlationId, body) } } -case class HeartbeatResponseAndHeader(override val header: ResponseHeader, override val body: HeartbeatResponse) - extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { +case class HeartbeatResponseAndHeader(override val correlationId: Int, override val body: HeartbeatResponse) + extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.HeartbeatKey), None) { } diff --git a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala index 9aea28c..3651e86 100644 --- a/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala +++ b/core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala @@ -17,24 +17,29 @@ import java.nio.ByteBuffer import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping import org.apache.kafka.common.requests._ +import kafka.api.ApiUtils._ import kafka.network.RequestChannel.Response import scala.Some object JoinGroupRequestAndHeader { def readFrom(buffer: ByteBuffer): JoinGroupRequestAndHeader = { - val header = RequestHeader.parse(buffer) + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = readShortString(buffer) val body = JoinGroupRequest.parse(buffer) - new JoinGroupRequestAndHeader(header, body) + new JoinGroupRequestAndHeader(versionId, correlationId, clientId, body) } } -case class JoinGroupRequestAndHeader(override val header: RequestHeader, override val body: JoinGroupRequest) - extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { +case class JoinGroupRequestAndHeader(override val versionId: Short, + override val correlationId: Int, + override val clientId: String, + override val body: JoinGroupRequest) + extends GenericRequestAndHeader(versionId, correlationId, clientId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), Some(RequestKeys.JoinGroupKey)) { override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponseHeader = new ResponseHeader(header.correlationId) val errorResponseBody = new JoinGroupResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(errorResponseHeader, errorResponseBody) + val errorHeartBeatResponseAndHeader = new JoinGroupResponseAndHeader(correlationId, errorResponseBody) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorHeartBeatResponseAndHeader))) } } diff --git a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala index 7389ae6..d0f07e0 100644 --- a/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala +++ b/core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala @@ -12,17 +12,17 @@ */ package kafka.api -import org.apache.kafka.common.requests.{JoinGroupResponse, ResponseHeader} +import org.apache.kafka.common.requests.JoinGroupResponse import java.nio.ByteBuffer object JoinGroupResponseAndHeader { def readFrom(buffer: ByteBuffer): JoinGroupResponseAndHeader = { - val header = ResponseHeader.parse(buffer) + val correlationId = buffer.getInt val body = JoinGroupResponse.parse(buffer) - new JoinGroupResponseAndHeader(header, body) + new JoinGroupResponseAndHeader(correlationId, body) } } -case class JoinGroupResponseAndHeader(override val header: ResponseHeader, override val body: JoinGroupResponse) - extends GenericRequestOrResponseAndHeader(header, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { +case class JoinGroupResponseAndHeader(override val correlationId: Int, override val body: JoinGroupResponse) + extends GenericResponseAndHeader(correlationId, body, RequestKeys.nameForKey(RequestKeys.JoinGroupKey), None) { } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 847a36b..cd16ced 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -196,29 +196,25 @@ object SerializationTestUtils { } def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = { - val header = new RequestHeader(ApiKeys.HEARTBEAT.id, 0.asInstanceOf[Short], "", 1) val body = new HeartbeatRequest("group1", 1, "consumer1") - HeartbeatRequestAndHeader(header, body) + HeartbeatRequestAndHeader(0.asInstanceOf[Short], 1, "", body) } def createHeartbeatResponseAndHeader: HeartbeatResponseAndHeader = { - val header = new ResponseHeader(1) val body = new HeartbeatResponse(0.asInstanceOf[Short]) - HeartbeatResponseAndHeader(header, body) + HeartbeatResponseAndHeader(1, body) } def createJoinGroupRequestAndHeader: JoinGroupRequestAndHeader = { import scala.collection.JavaConversions._ - val header = new RequestHeader(ApiKeys.JOIN_GROUP.id, 0.asInstanceOf[Short], "", 1) val body = new JoinGroupRequest("group1", 30000, List("topic1"), "consumer1", "strategy1"); - JoinGroupRequestAndHeader(header, body) + JoinGroupRequestAndHeader(0.asInstanceOf[Short], 1, "", body) } def createJoinGroupResponseAndHeader: JoinGroupResponseAndHeader = { import scala.collection.JavaConversions._ - val header = new ResponseHeader(1) val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1))) - JoinGroupResponseAndHeader(header, body) + JoinGroupResponseAndHeader(1, body) } }