From 46b3d2d392d288fe9c9693462706bea614e2b626 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 9 Apr 2015 14:29:08 -0700 Subject: [PATCH 1/3] replaced Recieve --- .../kafka/common/network/NetworkReceive.java | 34 ++++++++ .../scala/kafka/admin/ConsumerGroupCommand.scala | 2 +- core/src/main/scala/kafka/client/ClientUtils.scala | 2 +- .../main/scala/kafka/consumer/SimpleConsumer.scala | 19 ++--- .../consumer/ZookeeperConsumerConnector.scala | 4 +- .../controller/ControllerChannelManager.scala | 11 +-- .../main/scala/kafka/network/BlockingChannel.scala | 6 +- .../kafka/network/BoundedByteBufferReceive.scala | 90 ---------------------- core/src/main/scala/kafka/network/Handler.scala | 6 +- .../main/scala/kafka/network/SocketServer.scala | 8 +- .../main/scala/kafka/network/Transmission.scala | 21 +---- .../main/scala/kafka/producer/SyncProducer.scala | 13 ++-- core/src/main/scala/kafka/server/KafkaServer.scala | 8 +- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 2 +- .../test/scala/other/kafka/TestOffsetManager.scala | 6 +- 15 files changed, 84 insertions(+), 148 deletions(-) delete mode 100755 core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index fc0d168..5207807 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.network; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.ScatteringByteChannel; /** @@ -22,20 +23,38 @@ import java.nio.channels.ScatteringByteChannel; */ public class NetworkReceive implements Receive { + private final static int UNKNOWN_SOURCE = -1; + private final static int UNLIMITED = -1; + private final int source; private final ByteBuffer size; + private final long maxSize; private ByteBuffer buffer; + public NetworkReceive(int source, ByteBuffer buffer) { this.source = source; this.buffer = buffer; this.size = null; + this.maxSize = UNLIMITED; } public NetworkReceive(int source) { this.source = source; this.size = ByteBuffer.allocate(4); this.buffer = null; + this.maxSize = UNLIMITED; + } + + public NetworkReceive(long maxSize) { + this.source = UNKNOWN_SOURCE; + this.size = ByteBuffer.allocate(4); + this.buffer = null; + this.maxSize = maxSize; + } + + public NetworkReceive() { + this(UNKNOWN_SOURCE); } @Override @@ -55,6 +74,12 @@ public class NetworkReceive implements Receive { @Override public long readFrom(ScatteringByteChannel channel) throws IOException { + return readFromReadableChannel(channel); + } + + // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout + // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work + public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); @@ -83,4 +108,13 @@ public class NetworkReceive implements Receive { return this.buffer; } + // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel + public long readCompletely(ReadableByteChannel channel) throws IOException { + int totalRead = 0; + while (!complete()) { + totalRead += readFromReadableChannel(channel); + } + return totalRead; + } + } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 1c3b380..05e7db1 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -174,7 +174,7 @@ object ConsumerGroupCommand { val offsetMap = mutable.Map[TopicAndPartition, Long]() val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index b66424b..18529b0 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -153,7 +153,7 @@ object ClientUtils extends Logging{ debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) queryChannel.send(ConsumerMetadataRequest(group)) val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.payload()) debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index cbef84a..2fce6bd 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -21,6 +21,7 @@ import kafka.api._ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import org.apache.kafka.common.network.{NetworkReceive, Receive} import org.apache.kafka.common.utils.Utils._ /** @@ -62,9 +63,9 @@ class SimpleConsumer(val host: String, } } - private def sendRequest(request: RequestOrResponse): Receive = { + private def sendRequest(request: RequestOrResponse): NetworkReceive = { lock synchronized { - var response: Receive = null + var response: NetworkReceive = null try { getOrMakeConnection() blockingChannel.send(request) @@ -89,12 +90,12 @@ class SimpleConsumer(val host: String, def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = sendRequest(request) - TopicMetadataResponse.readFrom(response.buffer) + TopicMetadataResponse.readFrom(response.payload()) } def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = { val response = sendRequest(request) - ConsumerMetadataResponse.readFrom(response.buffer) + ConsumerMetadataResponse.readFrom(response.payload()) } /** @@ -104,7 +105,7 @@ class SimpleConsumer(val host: String, * @return a set of fetched messages */ def fetch(request: FetchRequest): FetchResponse = { - var response: Receive = null + var response: NetworkReceive = null val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer aggregateTimer.time { @@ -112,7 +113,7 @@ class SimpleConsumer(val host: String, response = sendRequest(request) } } - val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchResponse = FetchResponse.readFrom(response.payload()) val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) @@ -124,7 +125,7 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. */ - def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) + def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload()) /** * Commit offsets for a topic @@ -135,7 +136,7 @@ class SimpleConsumer(val host: String, def commitOffsets(request: OffsetCommitRequest) = { // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before // we can commit offsets. - OffsetCommitResponse.readFrom(sendRequest(request).buffer) + OffsetCommitResponse.readFrom(sendRequest(request).payload()) } /** @@ -144,7 +145,7 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetFetchRequest]] object. * @return a [[kafka.api.OffsetFetchResponse]] object. */ - def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) + def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload()) private def getOrMakeConnection() { if(!isClosed && !blockingChannel.isConnected) { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e250b94..ae5181a 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -334,7 +334,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { kafkaCommitMeter.mark(offsetsToCommit.size) offsetsChannel.send(offsetCommitRequest) - val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) + val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) trace("Offset commit response: %s.".format(offsetCommitResponse)) val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { @@ -421,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, ensureOffsetManagerConnected() try { offsetsChannel.send(offsetFetchRequest) - val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload()) trace("Offset fetch response: %s.".format(offsetFetchResponse)) val (leaderChanged, loadInProgress) = diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 97acdb2..da71f07 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,8 +16,9 @@ */ package kafka.controller -import kafka.network.{Receive, BlockingChannel} +import kafka.network.BlockingChannel import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.network.NetworkReceive import collection.mutable.HashMap import kafka.cluster.Broker import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} @@ -120,7 +121,7 @@ class RequestSendThread(val controllerId: Int, val queueItem = queue.take() val request = queueItem._1 val callback = queueItem._2 - var receive: Receive = null + var receive: NetworkReceive = null try { lock synchronized { var isSendSuccessful = false @@ -147,11 +148,11 @@ class RequestSendThread(val controllerId: Int, var response: RequestOrResponse = null request.requestId.get match { case RequestKeys.LeaderAndIsrKey => - response = LeaderAndIsrResponse.readFrom(receive.buffer) + response = LeaderAndIsrResponse.readFrom(receive.payload()) case RequestKeys.StopReplicaKey => - response = StopReplicaResponse.readFrom(receive.buffer) + response = StopReplicaResponse.readFrom(receive.payload()) case RequestKeys.UpdateMetadataKey => - response = UpdateMetadataResponse.readFrom(receive.buffer) + response = UpdateMetadataResponse.readFrom(receive.payload()) } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 6e2a38e..7c0d8fc 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -21,6 +21,7 @@ import java.net.InetSocketAddress import java.nio.channels._ import kafka.utils.{nonthreadsafe, Logging} import kafka.api.RequestOrResponse +import org.apache.kafka.common.network.NetworkReceive object BlockingChannel{ @@ -103,12 +104,13 @@ class BlockingChannel( val host: String, send.writeCompletely(writeChannel) } - def receive(): Receive = { + def receive(): NetworkReceive = { if(!connected) throw new ClosedChannelException() - val response = new BoundedByteBufferReceive() + val response = new NetworkReceive() response.readCompletely(readChannel) + response.payload().rewind() response } diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala deleted file mode 100755 index c0d7726..0000000 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ - -/** - * Represents a communication between the client and server - * - */ -@nonthreadsafe -private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging { - - private val sizeBuffer = ByteBuffer.allocate(4) - private var contentBuffer: ByteBuffer = null - - def this() = this(Int.MaxValue) - - var complete: Boolean = false - - /** - * Get the content buffer for this transmission - */ - def buffer: ByteBuffer = { - expectComplete() - contentBuffer - } - - /** - * Read the bytes in this response from the given channel - */ - def readFrom(channel: ReadableByteChannel): Int = { - expectIncomplete() - var read = 0 - // have we read the request size yet? - if(sizeBuffer.remaining > 0) - read += CoreUtils.read(channel, sizeBuffer) - // have we allocated the request buffer yet? - if(contentBuffer == null && !sizeBuffer.hasRemaining) { - sizeBuffer.rewind() - val size = sizeBuffer.getInt() - if(size <= 0) - throw new InvalidRequestException("%d is not a valid request size.".format(size)) - if(size > maxSize) - throw new InvalidRequestException("Request of length %d is not valid, it is larger than the maximum size of %d bytes.".format(size, maxSize)) - contentBuffer = byteBufferAllocate(size) - } - // if we have a buffer read some stuff into it - if(contentBuffer != null) { - read = CoreUtils.read(channel, contentBuffer) - // did we get everything? - if(!contentBuffer.hasRemaining) { - contentBuffer.rewind() - complete = true - } - } - read - } - - private def byteBufferAllocate(size: Int): ByteBuffer = { - var buffer: ByteBuffer = null - try { - buffer = ByteBuffer.allocate(size) - } catch { - case e: OutOfMemoryError => - error("OOME with size " + size, e) - throw e - case e2: Throwable => - throw e2 - } - buffer - } -} diff --git a/core/src/main/scala/kafka/network/Handler.scala b/core/src/main/scala/kafka/network/Handler.scala index a030033..cd1d143 100644 --- a/core/src/main/scala/kafka/network/Handler.scala +++ b/core/src/main/scala/kafka/network/Handler.scala @@ -17,17 +17,19 @@ package kafka.network +import org.apache.kafka.common.network.NetworkReceive + private[kafka] object Handler { /** * A request handler is a function that turns an incoming * transmission into an outgoing transmission */ - type Handler = Receive => Option[Send] + type Handler = NetworkReceive => Option[Send] /** * A handler mapping finds the right Handler function for a given request */ - type HandlerMapping = (Short, Receive) => Handler + type HandlerMapping = (Short, NetworkReceive) => Handler } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c5fec00..3998eb4 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -25,6 +25,7 @@ import java.io._ import java.nio.channels._ import kafka.cluster.EndPoint +import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection._ @@ -474,9 +475,9 @@ private[kafka] class Processor(val id: Int, def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) - var receive = key.attachment.asInstanceOf[Receive] + var receive = key.attachment.asInstanceOf[NetworkReceive] if(key.attachment == null) { - receive = new BoundedByteBufferReceive(maxRequestSize) + receive = new NetworkReceive(maxRequestSize.toLong) key.attach(receive) } val read = receive.readFrom(socketChannel) @@ -487,7 +488,8 @@ private[kafka] class Processor(val id: Int, } else if(receive.complete) { val port = socketChannel.socket().getLocalPort val protocol = portToProtocol.get(port) - val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol) + receive.payload().rewind() + val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.payload(), startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol) requestChannel.sendRequest(req) key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala index 2827103..296a78b 100644 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ b/core/src/main/scala/kafka/network/Transmission.scala @@ -41,26 +41,7 @@ private[network] trait Transmission extends Logging { } -/** - * A transmission that is being received from a channel - */ -trait Receive extends Transmission { - - def buffer: ByteBuffer - - def readFrom(channel: ReadableByteChannel): Int - - def readCompletely(channel: ReadableByteChannel): Int = { - var totalRead = 0 - while(!complete) { - val read = readFrom(channel) - trace(read + " bytes read.") - totalRead += read - } - totalRead - } - -} + /** * A transmission that is being sent out to the channel diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0f09951..a19058e 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -18,10 +18,11 @@ package kafka.producer import kafka.api._ -import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} +import kafka.network.{BlockingChannel, BoundedByteBufferSend} import kafka.utils._ import java.util.Random +import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.utils.Utils._ object SyncProducer { @@ -63,12 +64,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = { + private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = { lock synchronized { verifyRequest(request) getOrMakeConnection() - var response: Receive = null + var response: NetworkReceive = null try { blockingChannel.send(request) if(readResponse) @@ -95,7 +96,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) - var response: Receive = null + var response: NetworkReceive = null val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { @@ -104,14 +105,14 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } if(producerRequest.requiredAcks != 0) - ProducerResponse.readFrom(response.buffer) + ProducerResponse.readFrom(response.payload()) else null } def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = doSend(request) - TopicMetadataResponse.readFrom(response.buffer) + TopicMetadataResponse.readFrom(response.payload()) } def close() = { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c63f4ba..a57e0b8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -26,13 +26,15 @@ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import org.apache.kafka.common.network.NetworkReceive + import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} -import kafka.network.{Receive, BlockingChannel, SocketServer} +import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import kafka.coordinator.ConsumerCoordinator @@ -262,14 +264,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // 2. issue a controlled shutdown to the controller if (channel != null) { - var response: Receive = null + var response: NetworkReceive = null try { // send the controlled shutdown request val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) channel.send(request) response = channel.receive() - val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) + val shutdownResponse = ControlledShutdownResponse.readFrom(response.payload()) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) { shutdownSucceeded = true diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d2bac85..0740b3a 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -162,7 +162,7 @@ object ConsumerOffsetChecker extends Logging { debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) debug("Received offset fetch response %s.".format(offsetFetchResponse)) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 9881bd3..b708cd6 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -72,7 +72,7 @@ object TestOffsetManager { offsetsChannel.send(commitRequest) numCommits.getAndIncrement commitTimer.time { - val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) + val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) numErrors.getAndIncrement } offset += 1 @@ -119,7 +119,7 @@ object TestOffsetManager { val group = "group-" + id try { metadataChannel.send(ConsumerMetadataRequest(group)) - val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinatorOpt.map(_.id).getOrElse(-1) + val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) @@ -135,7 +135,7 @@ object TestOffsetManager { channel.send(fetchRequest) fetchTimer.time { - val response = OffsetFetchResponse.readFrom(channel.receive().buffer) + val response = OffsetFetchResponse.readFrom(channel.receive().payload()) if (response.requestInfo.exists(_._2.error != ErrorMapping.NoError)) { numErrors.getAndIncrement } -- 1.9.5 (Apple Git-50.3) From d4e2c082423f2e8b208659a5417d5b7ff08b1c1d Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 9 Apr 2015 18:11:24 -0700 Subject: [PATCH 2/3] first pass on replacing Send --- .../kafka/common/network/ByteBufferSend.java | 2 +- .../apache/kafka/common/network/NetworkSend.java | 6 ++ .../java/org/apache/kafka/common/network/Send.java | 2 +- .../apache/kafka/common/requests/ResponseSend.java | 43 +++++++++++++ .../scala/kafka/api/ConsumerMetadataRequest.scala | 7 ++- .../kafka/api/ControlledShutdownRequest.scala | 9 ++- core/src/main/scala/kafka/api/FetchResponse.scala | 49 ++++++++++----- .../main/scala/kafka/api/LeaderAndIsrRequest.scala | 12 ++-- .../main/scala/kafka/api/OffsetCommitRequest.scala | 10 +-- .../main/scala/kafka/api/OffsetFetchRequest.scala | 15 ++--- core/src/main/scala/kafka/api/OffsetRequest.scala | 7 ++- .../src/main/scala/kafka/api/ProducerRequest.scala | 7 ++- .../main/scala/kafka/api/StopReplicaRequest.scala | 4 +- .../scala/kafka/api/TopicMetadataRequest.scala | 8 ++- .../scala/kafka/api/UpdateMetadataRequest.scala | 4 +- .../scala/kafka/javaapi/TopicMetadataRequest.scala | 7 +-- .../main/scala/kafka/network/BlockingChannel.scala | 6 +- .../kafka/network/BoundedByteBufferSend.scala | 71 ---------------------- .../main/scala/kafka/network/ByteBufferSend.scala | 40 ------------ core/src/main/scala/kafka/network/Handler.scala | 2 +- .../main/scala/kafka/network/RequestChannel.scala | 1 + .../kafka/network/RequestOrResponseSend.scala | 57 +++++++++++++++++ .../main/scala/kafka/network/SocketServer.scala | 2 +- .../main/scala/kafka/network/Transmission.scala | 40 +++++------- .../main/scala/kafka/producer/SyncProducer.scala | 8 +-- core/src/main/scala/kafka/server/KafkaApis.scala | 37 ++++++----- .../main/scala/kafka/server/MessageSetSend.scala | 17 ++++-- .../unit/kafka/network/SocketServerTest.scala | 3 +- 28 files changed, 251 insertions(+), 225 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java delete mode 100644 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala delete mode 100644 core/src/main/scala/kafka/network/ByteBufferSend.scala create mode 100644 core/src/main/scala/kafka/network/RequestOrResponseSend.scala diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index c8213e1..e828833 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -52,7 +52,7 @@ public class ByteBufferSend implements Send { } @Override - public int remaining() { + public long remaining() { return this.remaining; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java index 68327cd..d1ee959 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java @@ -23,10 +23,16 @@ import java.nio.ByteBuffer; */ public class NetworkSend extends ByteBufferSend { + public final static int UNKNOWN_DEST = -1; + public NetworkSend(int destination, ByteBuffer... buffers) { super(destination, sizeDelimit(buffers)); } + public NetworkSend(ByteBuffer... buffers) { + this(UNKNOWN_DEST, buffers); + } + private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) { int size = 0; for (int i = 0; i < buffers.length; i++) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index 5d321a0..234945b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -29,7 +29,7 @@ public interface Send { /** * The number of bytes remaining to send */ - public int remaining(); + public long remaining(); /** * Is this send complete? diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java new file mode 100644 index 0000000..048119f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ResponseSend extends NetworkSend { + + private final static int UNKNOWN_DEST = -1; + + public ResponseSend(int destination, ResponseHeader header, Struct body) { + super(destination, serialize(header, body)); + } + + public ResponseSend(ResponseHeader header, AbstractRequestResponse response) { + this(UNKNOWN_DEST, header, response.toStruct()); + } + + private static ByteBuffer serialize(ResponseHeader header, Struct body) { + ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf()); + header.writeTo(buffer); + body.writeTo(buffer); + buffer.rewind(); + return buffer; + } +} diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala index a3b1b78..05cda52 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala @@ -18,9 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.network.RequestChannel.Response + import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} +import kafka.network.RequestChannel.Response object ConsumerMetadataRequest { val CurrentVersion = 0.shortValue @@ -64,7 +65,7 @@ case class ConsumerMetadataRequest(group: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { // return ConsumerCoordinatorNotAvailable for all uncaught errors val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } def describe(details: Boolean) = { diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 5be393a..ff4ddb9 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -18,10 +18,9 @@ package kafka.api import java.nio.ByteBuffer -import kafka.api.ApiUtils._ -import collection.mutable.ListBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.{TopicAndPartition, ErrorMapping} + +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging @@ -63,7 +62,7 @@ case class ControlledShutdownRequest(val versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition]) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean = false): String = { diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 75aaf57..002b17c 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -22,8 +22,9 @@ import java.nio.channels.GatheringByteChannel import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} -import kafka.network.{MultiSend, Send} +import kafka.network.MultiSend import kafka.api.ApiUtils._ +import org.apache.kafka.common.network.Send import scala.collection._ @@ -62,9 +63,15 @@ class PartitionDataSend(val partitionId: Int, buffer.putInt(partitionData.messages.sizeInBytes) buffer.rewind() - override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize - override def writeTo(channel: GatheringByteChannel): Int = { + override def destination: Int = -1 + + override def reify = null + + override def remaining: Long = buffer.remaining().toLong + (messageSize - messagesSentSize) + + override def writeTo(channel: GatheringByteChannel): Long = { var written = 0 if(buffer.hasRemaining) written += channel.write(buffer) @@ -104,9 +111,15 @@ case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartiti class TopicDataSend(val topicData: TopicData) extends Send { private val size = topicData.sizeInBytes - private var sent = 0 + private var sent = 0L + + override def completed: Boolean = sent >= size + + override def destination: Int = -1 + + override def reify = null - override def complete = sent >= size + override def remaining: Long = (size.toLong - sent) private val buffer = ByteBuffer.allocate(topicData.headerSize) writeShortString(buffer, topicData.topic) @@ -118,12 +131,12 @@ class TopicDataSend(val topicData: TopicData) extends Send { val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize } - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 + def writeTo(channel: GatheringByteChannel): Long = { + // TODO: expectIncomplete() + var written = 0L if(buffer.hasRemaining) written += channel.write(buffer) - if(!buffer.hasRemaining && !sends.complete) { + if(!buffer.hasRemaining && !sends.completed) { written += sends.writeTo(channel) } sent += written @@ -203,11 +216,17 @@ case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchR class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { private val size = fetchResponse.sizeInBytes - private var sent = 0 + private var sent = 0L private val sendSize = 4 /* for size */ + size - override def complete = sent >= sendSize + override def completed = sent >= sendSize + + override def destination = -1 + + override def reify = null + + override def remaining = sendSize - sent private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) buffer.putInt(size) @@ -222,12 +241,12 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize } - def writeTo(channel: GatheringByteChannel):Int = { - expectIncomplete() - var written = 0 + def writeTo(channel: GatheringByteChannel):Long = { + //TODO: expectIncomplete() + var written = 0L if(buffer.hasRemaining) written += channel.write(buffer) - if(!buffer.hasRemaining && !sends.complete) { + if(!buffer.hasRemaining && !sends.completed) { written += sends.writeTo(channel) } sent += written diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 2fad585..e03bdb0 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -19,14 +19,16 @@ package kafka.api import java.nio._ -import kafka.utils._ + import kafka.api.ApiUtils._ import kafka.cluster.BrokerEndPoint -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import collection.Set +import kafka.utils._ + +import scala.collection.Set object LeaderAndIsr { @@ -184,7 +186,7 @@ case class LeaderAndIsrRequest (versionId: Short, case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } val errorResponse = LeaderAndIsrResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index cf8e6ac..226b8f8 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -18,11 +18,13 @@ package kafka.api import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import kafka.utils.{SystemTime, Logging} -import kafka.network.{RequestChannel, BoundedByteBufferSend} -import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition} +import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response +import kafka.utils.Logging + import scala.collection._ object OffsetCommitRequest extends Logging { @@ -161,7 +163,7 @@ case class OffsetCommitRequest(groupId: String, val commitStatus = requestInfo.mapValues(_ => errorCode) val commitResponse = OffsetCommitResponse(commitStatus, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(commitResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index 67811a7..f179903 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -17,16 +17,13 @@ package kafka.api +import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import kafka.utils.Logging -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common._ -import kafka.common.TopicAndPartition +import kafka.common.{TopicAndPartition, _} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response - -import scala.Some - -import java.nio.ByteBuffer +import kafka.utils.Logging object OffsetFetchRequest extends Logging { val CurrentVersion: Short = 0 @@ -99,7 +96,7 @@ case class OffsetFetchRequest(groupId: String, )) }.toMap val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 3d483bc..b1bf46a 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -18,9 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.{ErrorMapping, TopicAndPartition} + import kafka.api.ApiUtils._ -import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response @@ -117,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) } val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 570b2da..f5167e4 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -18,11 +18,12 @@ package kafka.api import java.nio._ -import kafka.message._ + import kafka.api.ApiUtils._ import kafka.common._ +import kafka.message._ +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import kafka.network.{RequestChannel, BoundedByteBufferSend} object ProducerRequest { val CurrentVersion = 0.shortValue @@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } } diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 5e14987..aee6973 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio._ import kafka.api.ApiUtils._ -import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException} +import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException} import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.network.RequestChannel.Response import kafka.utils.Logging @@ -106,7 +106,7 @@ case class StopReplicaRequest(versionId: Short, case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) }.toMap val errorResponse = StopReplicaResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7dca09c..3cdbd40 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -18,13 +18,15 @@ package kafka.api import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import collection.mutable.ListBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging +import scala.collection.mutable.ListBuffer + object TopicMetadataRequest extends Logging { val CurrentVersion = 0.shortValue val DefaultClientId = "" @@ -80,7 +82,7 @@ case class TopicMetadataRequest(val versionId: Short, topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 69f0397..e463395 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -21,8 +21,8 @@ import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.cluster.{Broker, BrokerEndPoint} import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import kafka.network.{BoundedByteBufferSend, RequestChannel} import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.Set @@ -128,7 +128,7 @@ case class UpdateMetadataRequest (versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]])) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(errorResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index b0b7be1..568d0ac 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -16,12 +16,11 @@ */ package kafka.javaapi -import kafka.api._ import java.nio.ByteBuffer + +import kafka.api._ + import scala.collection.mutable -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, val correlationId: Int, diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 7c0d8fc..7415f36 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -21,7 +21,7 @@ import java.net.InetSocketAddress import java.nio.channels._ import kafka.utils.{nonthreadsafe, Logging} import kafka.api.RequestOrResponse -import org.apache.kafka.common.network.NetworkReceive +import org.apache.kafka.common.network.{NetworkSend, NetworkReceive} object BlockingChannel{ @@ -96,11 +96,11 @@ class BlockingChannel( val host: String, def isConnected = connected - def send(request: RequestOrResponse):Int = { + def send(request: RequestOrResponse): Long = { if(!connected) throw new ClosedChannelException() - val send = new BoundedByteBufferSend(request) + val send = new RequestOrResponseSend(request) send.writeCompletely(writeChannel) } diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala deleted file mode 100644 index b95b73b..0000000 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ -import kafka.api.RequestOrResponse -import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} - -@nonthreadsafe -private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { - - private val sizeBuffer = ByteBuffer.allocate(4) - - // Avoid possibility of overflow for 2GB-4 byte buffer - if(buffer.remaining > Int.MaxValue - sizeBuffer.limit) - throw new IllegalStateException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " + - "allowable size for a bounded buffer is " + (Int.MaxValue - sizeBuffer.limit) + ".") - sizeBuffer.putInt(buffer.limit) - sizeBuffer.rewind() - - var complete: Boolean = false - - def this(size: Int) = this(ByteBuffer.allocate(size)) - - def this(request: RequestOrResponse) = { - this(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) - request.requestId match { - case Some(requestId) => - buffer.putShort(requestId) - case None => - } - - request.writeTo(buffer) - buffer.rewind() - } - - def this(header: ResponseHeader, body: AbstractRequestResponse) = { - this(header.sizeOf + body.sizeOf) - header.writeTo(buffer) - body.writeTo(buffer) - buffer.rewind - } - - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - val written = channel.write(Array(sizeBuffer, buffer)) - // if we are done, mark it off - if(!buffer.hasRemaining) - complete = true - written.asInstanceOf[Int] - } - -} diff --git a/core/src/main/scala/kafka/network/ByteBufferSend.scala b/core/src/main/scala/kafka/network/ByteBufferSend.scala deleted file mode 100644 index af30042..0000000 --- a/core/src/main/scala/kafka/network/ByteBufferSend.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ - -@nonthreadsafe -private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send { - - var complete: Boolean = false - - def this(size: Int) = this(ByteBuffer.allocate(size)) - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - written += channel.write(buffer) - if(!buffer.hasRemaining) - complete = true - written - } - -} diff --git a/core/src/main/scala/kafka/network/Handler.scala b/core/src/main/scala/kafka/network/Handler.scala index cd1d143..2148e0c 100644 --- a/core/src/main/scala/kafka/network/Handler.scala +++ b/core/src/main/scala/kafka/network/Handler.scala @@ -17,7 +17,7 @@ package kafka.network -import org.apache.kafka.common.network.NetworkReceive +import org.apache.kafka.common.network.{Send, NetworkReceive} private[kafka] object Handler { diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d9c57b..47d9a5d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala new file mode 100644 index 0000000..424258f --- /dev/null +++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.nio.ByteBuffer +import java.nio.channels.GatheringByteChannel + +import kafka.api.RequestOrResponse +import kafka.utils.Logging +import org.apache.kafka.common.network.NetworkSend + +object RequestOrResponseSend { + def serialize(request: RequestOrResponse): ByteBuffer = { + val buffer = ByteBuffer.allocate(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) + request.requestId match { + case Some(requestId) => + buffer.putShort(requestId) + case None => + } + request.writeTo(buffer) + buffer.rewind() + buffer + } +} + +class RequestOrResponseSend(val buffer: ByteBuffer) extends NetworkSend(buffer) with Logging { + + def this(request: RequestOrResponse) { + this(RequestOrResponseSend.serialize(request)) + } + + def writeCompletely(channel: GatheringByteChannel): Long = { + var totalWritten = 0L + while(!completed()) { + val written = writeTo(channel) + trace(written + " bytes written.") + totalWritten += written + } + totalWritten + } + +} diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 3998eb4..28bd4c3 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -513,7 +513,7 @@ private[kafka] class Processor(val id: Int, throw new IllegalStateException("Registered for write interest but no response attached to key.") val written = responseSend.writeTo(socketChannel) trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) - if(responseSend.complete) { + if(responseSend.completed()) { response.request.updateRequestMetrics() key.attach(null) trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala index 296a78b..4c7a557 100644 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ b/core/src/main/scala/kafka/network/Transmission.scala @@ -21,6 +21,7 @@ import java.nio._ import java.nio.channels._ import kafka.utils.Logging import kafka.common.KafkaException +import org.apache.kafka.common.network.Send /** * Represents a stateful transfer of data to or from the network @@ -43,55 +44,44 @@ private[network] trait Transmission extends Logging { -/** - * A transmission that is being sent out to the channel - */ -trait Send extends Transmission { - - def writeTo(channel: GatheringByteChannel): Int - def writeCompletely(channel: GatheringByteChannel): Int = { - var totalWritten = 0 - while(!complete) { - val written = writeTo(channel) - trace(written + " bytes written.") - totalWritten += written - } - totalWritten - } - -} /** * A set of composite sends, sent one after another */ -abstract class MultiSend[S <: Send](val sends: List[S]) extends Send { +abstract class MultiSend[S <: Send](val sends: List[S]) extends Send with Logging { val expectedBytesToWrite: Int private var current = sends - var totalWritten = 0 + var totalWritten = 0L + + override def destination = -1; + + override def reify = null; + + override def remaining = (expectedBytesToWrite - totalWritten) /** * This method continues to write to the socket buffer till an incomplete * write happens. On an incomplete write, it returns to the caller to give it * a chance to schedule other work till the buffered write completes. */ - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete - var totalWrittenPerCall = 0 + def writeTo(channel: GatheringByteChannel): Long = { + //TODO: Is this needed? expectIncomplete + var totalWrittenPerCall = 0L var sendComplete: Boolean = false do { val written = current.head.writeTo(channel) totalWritten += written totalWrittenPerCall += written - sendComplete = current.head.complete + sendComplete = current.head.completed() if(sendComplete) current = current.tail - } while (!complete && sendComplete) + } while (!completed && sendComplete) trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + expectedBytesToWrite) totalWrittenPerCall } - def complete: Boolean = { + def completed: Boolean = { if (current == Nil) { if (totalWritten != expectedBytesToWrite) error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index a19058e..2a6c91e 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,11 +17,11 @@ package kafka.producer -import kafka.api._ -import kafka.network.{BlockingChannel, BoundedByteBufferSend} -import kafka.utils._ import java.util.Random +import kafka.api._ +import kafka.network.{RequestOrResponseSend, BlockingChannel} +import kafka.utils._ import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.utils.Utils._ @@ -51,7 +51,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level */ if (logger.isDebugEnabled) { - val buffer = new BoundedByteBufferSend(request).buffer + val buffer = new RequestOrResponseSend(request).buffer trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() if(requestTypeId == RequestKeys.ProduceKey) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b4004aa..feea236 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,8 +17,16 @@ package kafka.server + +import kafka.api.ConsumerMetadataRequest +import kafka.api.ConsumerMetadataResponse +import kafka.api.FetchRequest +import kafka.api.FetchResponse +import kafka.api.OffsetCommitRequest +import kafka.api.OffsetCommitResponse +import kafka.api.OffsetFetchRequest +import kafka.api.OffsetFetchResponse import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ import kafka.admin.AdminUtils @@ -29,6 +37,7 @@ import kafka.log._ import kafka.network._ import kafka.network.RequestChannel.Response import kafka.utils.{SystemTime, Logging} +import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend} import scala.collection._ @@ -84,7 +93,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (response == null) requestChannel.closeConnection(request.processor, request) else - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + requestChannel.sendResponse(new Response(request, new ResponseSend(respHeader, response))) } error("error when handling request %s".format(request.requestObj), e) } finally @@ -99,7 +108,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(leaderAndIsrResponse))) } catch { case e: KafkaStorageException => fatal("Disk error during leadership change.", e) @@ -114,7 +123,7 @@ class KafkaApis(val requestChannel: RequestChannel, val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(stopReplicaResponse))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() } @@ -123,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(updateMetadataResponse))) } def handleControlledShutdownRequest(request: RequestChannel.Request) { @@ -134,7 +143,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.NoError, partitionsRemaining) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(controlledShutdownResponse))) } @@ -158,7 +167,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(response))) } // compute the retention time based on the request version: @@ -234,7 +243,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(response))) } } @@ -337,7 +346,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = OffsetResponse(offsetRequest.correlationId, responseMap) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(response))) } def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { @@ -440,7 +449,7 @@ class KafkaApis(val requestChannel: RequestChannel, val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(response))) } /* @@ -464,7 +473,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending offset fetch response %s for correlation id %d to client %s." .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(response))) } /* @@ -489,7 +498,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending consumer metadata %s for correlation id %d to client %s." .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(response))) } def handleJoinGroupRequest(request: RequestChannel.Request) { @@ -502,7 +511,7 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer val responseBody = new JoinGroupResponse(errorCode, generationId, joinGroupRequest.consumerId, partitionList) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(respHeader, responseBody))) } // let the coordinator to handle join-group @@ -522,7 +531,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { val response = new HeartbeatResponse(errorCode) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(respHeader, response))) } // let the coordinator to handle heartbeat diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala b/core/src/main/scala/kafka/server/MessageSetSend.scala index 5667648..484c5d8 100644 --- a/core/src/main/scala/kafka/server/MessageSetSend.scala +++ b/core/src/main/scala/kafka/server/MessageSetSend.scala @@ -23,13 +23,14 @@ import kafka.network._ import kafka.message._ import kafka.utils._ import kafka.common.ErrorMapping +import org.apache.kafka.common.network.Send /** * A zero-copy message response that writes the bytes needed directly from the file * wholly in kernel space */ @nonthreadsafe -private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send { +private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send with Logging{ private var sent: Int = 0 private val size: Int = messages.sizeInBytes @@ -38,14 +39,20 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Sh header.putShort(errorCode) header.rewind() - var complete: Boolean = false + var completed: Boolean = false def this(messages: MessageSet) = this(messages, ErrorMapping.NoError) def this() = this(MessageSet.Empty) - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() + override def destination = -1 + + override def reify = null; + + override def remaining = size - sent + + def writeTo(channel: GatheringByteChannel): Long = { + //TODO: expectIncomplete() var written = 0 if(header.hasRemaining) written += channel.write(header) @@ -62,7 +69,7 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Sh } if(sent >= size) - complete = true + completed = true written } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 95d5621..6fa4f12 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -20,6 +20,7 @@ package kafka.network; import java.net._ import java.io._ import kafka.cluster.EndPoint +import org.apache.kafka.common.network.NetworkSend import org.apache.kafka.common.protocol.SecurityProtocol import org.junit._ import org.scalatest.junit.JUnitSuite @@ -71,7 +72,7 @@ class SocketServerTest extends JUnitSuite { val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) request.requestObj.writeTo(byteBuffer) byteBuffer.rewind() - val send = new BoundedByteBufferSend(byteBuffer) + val send = new NetworkSend(byteBuffer) channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } -- 1.9.5 (Apple Git-50.3) From 12fff39132fbd45e120645400f3b311fcec49475 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 9 Apr 2015 20:44:27 -0700 Subject: [PATCH 3/3] implement maxSize and improved docs --- .../main/java/org/apache/kafka/common/network/NetworkReceive.java | 7 ++++++- .../main/java/org/apache/kafka/common/requests/ResponseSend.java | 2 -- core/src/main/scala/kafka/network/BlockingChannel.scala | 2 ++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index 5207807..cf97ea5 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -72,13 +72,15 @@ public class NetworkReceive implements Receive { return new ByteBuffer[] {this.buffer}; } - @Override + public long readFrom(ScatteringByteChannel channel) throws IOException { return readFromReadableChannel(channel); } // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work + // This can go away after we get rid of BlockingChannel + @Deprecated public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { @@ -91,6 +93,8 @@ public class NetworkReceive implements Receive { int requestSize = size.getInt(); if (requestSize < 0) throw new IllegalStateException("Invalid request (size = " + requestSize + ")"); + if (maxSize != UNLIMITED && requestSize > maxSize) + throw new IllegalStateException("Invalid request (size = " + requestSize + " larger than " + maxSize + ")"); this.buffer = ByteBuffer.allocate(requestSize); } } @@ -109,6 +113,7 @@ public class NetworkReceive implements Receive { } // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel + @Deprecated public long readCompletely(ReadableByteChannel channel) throws IOException { int totalRead = 0; while (!complete()) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java index 048119f..c510559 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java @@ -23,8 +23,6 @@ import java.nio.ByteBuffer; public class ResponseSend extends NetworkSend { - private final static int UNKNOWN_DEST = -1; - public ResponseSend(int destination, ResponseHeader header, Struct body) { super(destination, serialize(header, body)); } diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 7415f36..45adb92 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -60,6 +60,8 @@ class BlockingChannel( val host: String, channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel + // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout + // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true // settings may not match what we requested above -- 1.9.5 (Apple Git-50.3)