diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index ad6a20d..a454e4a 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -58,6 +58,15 @@ case class ControlledShutdownRequest(val versionId: Short, } override def toString(): String = { + describeRequest(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition]) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + override def describeRequest(details: Boolean = false): String = { val controlledShutdownRequest = new StringBuilder controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName) controlledShutdownRequest.append("; Version: " + versionId) @@ -65,9 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short, controlledShutdownRequest.append("; BrokerId: " + brokerId) controlledShutdownRequest.toString() } - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition]) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - } -} +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index d41a705..ed89d6d 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -141,16 +141,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV def numPartitions = requestInfo.size override def toString(): String = { - val fetchRequest = new StringBuilder - fetchRequest.append("Name: " + this.getClass.getSimpleName) - fetchRequest.append("; Version: " + versionId) - fetchRequest.append("; CorrelationId: " + correlationId) - fetchRequest.append("; ClientId: " + clientId) - fetchRequest.append("; ReplicaId: " + replicaId) - fetchRequest.append("; MaxWait: " + maxWait + " ms") - fetchRequest.append("; MinBytes: " + minBytes + " bytes") - fetchRequest.append("; RequestInfo: " + requestInfo.mkString(",")) - fetchRequest.toString() + describeRequest(true) } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { @@ -161,8 +152,21 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) } -} + override def describeRequest(details: Boolean): String = { + val fetchRequest = new StringBuilder + fetchRequest.append("Name: " + this.getClass.getSimpleName) + fetchRequest.append("; Version: " + versionId) + fetchRequest.append("; CorrelationId: " + correlationId) + fetchRequest.append("; ClientId: " + clientId) + fetchRequest.append("; ReplicaId: " + replicaId) + fetchRequest.append("; MaxWait: " + maxWait + " ms") + fetchRequest.append("; MinBytes: " + minBytes + " bytes") + if(details) + fetchRequest.append("; RequestInfo: " + requestInfo.mkString(",")) + fetchRequest.toString() + } +} @nonthreadsafe class FetchRequestBuilder() { diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 3401afa..93d3fbb 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -173,6 +173,18 @@ case class LeaderAndIsrRequest (versionId: Short, } override def toString(): String = { + describeRequest(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val responseMap = partitionStateInfos.map { + case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + val errorResponse = LeaderAndIsrResponse(correlationId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + override def describeRequest(details: Boolean): String = { val leaderAndIsrRequest = new StringBuilder leaderAndIsrRequest.append("Name:" + this.getClass.getSimpleName) leaderAndIsrRequest.append(";Version:" + versionId) @@ -180,16 +192,9 @@ case class LeaderAndIsrRequest (versionId: Short, leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch) leaderAndIsrRequest.append(";CorrelationId:" + correlationId) leaderAndIsrRequest.append(";ClientId:" + clientId) - leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) leaderAndIsrRequest.append(";Leaders:" + leaders.mkString(",")) + if(details) + leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) leaderAndIsrRequest.toString() } - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val responseMap = partitionStateInfos.map { - case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } - val errorResponse = LeaderAndIsrResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 1cbe6e8..e7e1b8e 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -106,4 +106,20 @@ case class OffsetCommitRequest(groupId: String, val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } + + override def describeRequest(details: Boolean): String = { + val offsetCommitRequest = new StringBuilder + offsetCommitRequest.append("Name: " + this.getClass.getSimpleName) + offsetCommitRequest.append("; Version: " + versionId) + offsetCommitRequest.append("; CorrelationId: " + correlationId) + offsetCommitRequest.append("; ClientId: " + clientId) + offsetCommitRequest.append("; GroupId: " + groupId) + if(details) + offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(",")) + offsetCommitRequest.toString() + } + + override def toString(): String = { + describeRequest(true) + } } diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index a4c5623..5aace7b 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -98,4 +98,20 @@ case class OffsetFetchRequest(groupId: String, val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } -} + + override def describeRequest(details: Boolean): String = { + val offsetFetchRequest = new StringBuilder + offsetFetchRequest.append("Name: " + this.getClass.getSimpleName) + offsetFetchRequest.append("; Version: " + versionId) + offsetFetchRequest.append("; CorrelationId: " + correlationId) + offsetFetchRequest.append("; ClientId: " + clientId) + offsetFetchRequest.append("; GroupId: " + groupId) + if(details) + offsetFetchRequest.append("; RequestInfo: " + requestInfo.mkString(",")) + offsetFetchRequest.toString() + } + + override def toString(): String = { + describeRequest(true) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 0a94a6c..571c41c 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -108,14 +108,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId override def toString(): String = { - val offsetRequest = new StringBuilder - offsetRequest.append("Name: " + this.getClass.getSimpleName) - offsetRequest.append("; Version: " + versionId) - offsetRequest.append("; CorrelationId: " + correlationId) - offsetRequest.append("; ClientId: " + clientId) - offsetRequest.append("; RequestInfo: " + requestInfo.mkString(",")) - offsetRequest.append("; ReplicaId: " + replicaId) - offsetRequest.toString() + describeRequest(true) } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { @@ -126,4 +119,16 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } -} + + override def describeRequest(details: Boolean): String = { + val offsetRequest = new StringBuilder + offsetRequest.append("Name: " + this.getClass.getSimpleName) + offsetRequest.append("; Version: " + versionId) + offsetRequest.append("; CorrelationId: " + correlationId) + offsetRequest.append("; ClientId: " + clientId) + offsetRequest.append("; ReplicaId: " + replicaId) + if(details) + offsetRequest.append("; RequestInfo: " + requestInfo.mkString(",")) + offsetRequest.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index c606351..04bd3af 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -123,15 +123,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, def numPartitions = data.size override def toString(): String = { - val producerRequest = new StringBuilder - producerRequest.append("Name: " + this.getClass.getSimpleName) - producerRequest.append("; Version: " + versionId) - producerRequest.append("; CorrelationId: " + correlationId) - producerRequest.append("; ClientId: " + clientId) - producerRequest.append("; RequiredAcks: " + requiredAcks) - producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") - producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(",")) - producerRequest.toString() + describeRequest(true) } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { @@ -148,6 +140,20 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } } + override def describeRequest(details: Boolean): String = { + val producerRequest = new StringBuilder + producerRequest.append("Name: " + this.getClass.getSimpleName) + producerRequest.append("; Version: " + versionId) + producerRequest.append("; CorrelationId: " + correlationId) + producerRequest.append("; ClientId: " + clientId) + producerRequest.append("; RequiredAcks: " + requiredAcks) + producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") + if(details) + producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(",")) + producerRequest.toString() + } + + def emptyData(){ data.clear() } diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index ba59c31..b6351a4 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -30,12 +30,18 @@ object Request { } -private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging{ +private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging { def sizeInBytes: Int def writeTo(buffer: ByteBuffer): Unit def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {} + + /* The purpose of this API is to return a string description of the Request mainly for the purpose of request logging. + * This API has no meaning for a Response object. + * @param details If this is false, omit the parts of the request description that are proportional to the number of + * topics or partitions. This is mainly to control the amount of request logging. */ + def describeRequest(details: Boolean):String = { null } } diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index efd7046..461981c 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -98,6 +98,18 @@ case class StopReplicaRequest(versionId: Short, } override def toString(): String = { + describeRequest(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val responseMap = partitions.map { + case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + }.toMap + val errorResponse = StopReplicaResponse(correlationId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + override def describeRequest(details: Boolean): String = { val stopReplicaRequest = new StringBuilder stopReplicaRequest.append("Name: " + this.getClass.getSimpleName) stopReplicaRequest.append("; Version: " + versionId) @@ -106,15 +118,8 @@ case class StopReplicaRequest(versionId: Short, stopReplicaRequest.append("; DeletePartitions: " + deletePartitions) stopReplicaRequest.append("; ControllerId: " + controllerId) stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch) - stopReplicaRequest.append("; Partitions: " + partitions.mkString(",")) + if(details) + stopReplicaRequest.append("; Partitions: " + partitions.mkString(",")) stopReplicaRequest.toString() } - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val responseMap = partitions.map { - case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - }.toMap - val errorResponse = StopReplicaResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - } -} +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index c5221c4..e6d710d 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -72,13 +72,7 @@ case class TopicMetadataRequest(val versionId: Short, } override def toString(): String = { - val topicMetadataRequest = new StringBuilder - topicMetadataRequest.append("Name: " + this.getClass.getSimpleName) - topicMetadataRequest.append("; Version: " + versionId) - topicMetadataRequest.append("; CorrelationId: " + correlationId) - topicMetadataRequest.append("; ClientId: " + clientId) - topicMetadataRequest.append("; Topics: " + topics.mkString(",")) - topicMetadataRequest.toString() + describeRequest(true) } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { @@ -88,4 +82,15 @@ case class TopicMetadataRequest(val versionId: Short, val errorResponse = TopicMetadataResponse(topicMetadata, correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) } -} + + override def describeRequest(details: Boolean): String = { + val topicMetadataRequest = new StringBuilder + topicMetadataRequest.append("Name: " + this.getClass.getSimpleName) + topicMetadataRequest.append("; Version: " + versionId) + topicMetadataRequest.append("; CorrelationId: " + correlationId) + topicMetadataRequest.append("; ClientId: " + clientId) + if(details) + topicMetadataRequest.append("; Topics: " + topics.mkString(",")) + topicMetadataRequest.toString() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 2ead364..1679ab0 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -101,6 +101,15 @@ case class UpdateMetadataRequest (versionId: Short, } override def toString(): String = { + describeRequest(true) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]])) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } + + override def describeRequest(details: Boolean): String = { val updateMetadataRequest = new StringBuilder updateMetadataRequest.append("Name:" + this.getClass.getSimpleName) updateMetadataRequest.append(";Version:" + versionId) @@ -108,13 +117,9 @@ case class UpdateMetadataRequest (versionId: Short, updateMetadataRequest.append(";ControllerEpoch:" + controllerEpoch) updateMetadataRequest.append(";CorrelationId:" + correlationId) updateMetadataRequest.append(";ClientId:" + clientId) - updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) updateMetadataRequest.append(";AliveBrokers:" + aliveBrokers.mkString(",")) + if(details) + updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) updateMetadataRequest.toString() } - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]])) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) - } -} +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 330d3a0..5b53bad 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -83,7 +83,11 @@ object RequestChannel extends Logging { } if(requestLogger.isTraceEnabled) requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(requestObj, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + .format(requestObj.describeRequest(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + else if(requestLogger.isDebugEnabled) { + requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestObj.describeRequest(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + } } }