From 3561ba4fc42332b87cbb768a6143f461e93a6eba Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Sat, 25 Oct 2014 22:12:09 -0700 Subject: [PATCH] add Session concept in SocketServer --- .../main/scala/kafka/network/RequestChannel.scala | 7 ++-- .../main/scala/kafka/network/SocketServer.scala | 38 ++++++++++++++++------ 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 4560d8f..2f0bbae 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -17,6 +17,7 @@ package kafka.network +import java.security.Principal import java.util.concurrent._ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge @@ -30,7 +31,7 @@ import org.apache.log4j.Logger object RequestChannel extends Logging { - val AllDone = new Request(1, 2, getShutdownReceive(), 0) + val AllDone = new Request(processor = 1, requestKey = 2, buffer = getShutdownReceive(), startTimeMs = 0) def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -41,7 +42,7 @@ object RequestChannel extends Logging { byteBuffer } - case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { + case class Request(processor: Int, requestKey: Any, session: Session = null, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { @volatile var requestDequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L @@ -90,6 +91,8 @@ object RequestChannel extends Logging { } } } + + case class Session(principal: Principal, host: String) case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) { request.responseCompleteTimeMs = SystemTime.milliseconds diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index cee76b3..d769aed 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -17,6 +17,7 @@ package kafka.network +import java.security.Principal import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ @@ -24,6 +25,8 @@ import java.net._ import java.io._ import java.nio.channels._ +import com.sun.security.auth.UserPrincipal + import scala.collection._ import kafka.common.KafkaException @@ -378,6 +381,7 @@ private[kafka] class Processor(val id: Int, var curr = requestChannel.receiveResponse(id) while(curr != null) { val key = curr.request.requestKey.asInstanceOf[SelectionKey] + val cnxn = key.attachment().asInstanceOf[KafkaCnxn] try { curr.responseAction match { case RequestChannel.NoOpAction => { @@ -386,12 +390,14 @@ private[kafka] class Processor(val id: Int, curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) key.interestOps(SelectionKey.OP_READ) - key.attach(null) + cnxn.response = null + key.attach(cnxn) } case RequestChannel.SendAction => { trace("Socket server received response to send, registering for write: " + curr) key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + cnxn.response = curr + key.attach(cnxn) } case RequestChannel.CloseConnectionAction => { curr.request.updateRequestMetrics @@ -426,7 +432,10 @@ private[kafka] class Processor(val id: Int, while(newConnections.size() > 0) { val channel = newConnections.poll() debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) - channel.register(selector, SelectionKey.OP_READ) + val key = channel.register(selector, SelectionKey.OP_READ) + + val cnxn = new KafkaCnxn(principal = new UserPrincipal("nobody"), transmit = null, response = null); + key.attach(cnxn) } } @@ -436,20 +445,25 @@ private[kafka] class Processor(val id: Int, def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) - var receive = key.attachment.asInstanceOf[Receive] - if(key.attachment == null) { + val cnxn = key.attachment.asInstanceOf[KafkaCnxn] + var receive = cnxn.transmit.asInstanceOf[Receive] + if(receive == null) { receive = new BoundedByteBufferReceive(maxRequestSize) - key.attach(receive) + cnxn.transmit = receive + key.attach(cnxn) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); + val hostname = socketChannel.socket.getInetAddress.getCanonicalHostName trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { - val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) + val session = RequestChannel.Session(cnxn.principal,hostname) + val req = RequestChannel.Request(processor = id, session = session, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) - key.attach(null) + cnxn.transmit = null + key.attach(cnxn) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { @@ -465,7 +479,8 @@ private[kafka] class Processor(val id: Int, */ def write(key: SelectionKey) { val socketChannel = channelFor(key) - val response = key.attachment().asInstanceOf[RequestChannel.Response] + val cnxn = key.attachment().asInstanceOf[KafkaCnxn] + val response = cnxn.response val responseSend = response.responseSend if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") @@ -473,7 +488,8 @@ private[kafka] class Processor(val id: Int, trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) if(responseSend.complete) { response.request.updateRequestMetrics() - key.attach(null) + cnxn.transmit = null + key.attach(cnxn) trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { @@ -532,3 +548,5 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { } class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count)) + +case class KafkaCnxn(val principal: Principal, var transmit: Transmission, var response: RequestChannel.Response); \ No newline at end of file -- 1.9.3 (Apple Git-50)