diff --git a/config/consumer.properties b/config/consumer.properties index 83847de..7343cbc 100644 --- a/config/consumer.properties +++ b/config/consumer.properties @@ -20,7 +20,7 @@ zookeeper.connect=127.0.0.1:2181 # timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=6000 +zookeeper.connection.timeout.ms=1000000 #consumer group id group.id=test-consumer-group diff --git a/config/server.properties b/config/server.properties index f16c84c..c9e923a 100644 --- a/config/server.properties +++ b/config/server.properties @@ -37,16 +37,16 @@ port=9092 #advertised.port= # The number of threads handling network requests -num.network.threads=3 +num.network.threads=2 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes=102400 +socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes=65536 +socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 @@ -60,7 +60,7 @@ log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. -num.partitions=1 +num.partitions=2 ############################# Log Flush Policy ############################# @@ -94,11 +94,11 @@ log.retention.hours=168 #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1073741824 +log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies -log.retention.check.interval.ms=300000 +log.retention.check.interval.ms=60000 # By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. @@ -114,4 +114,4 @@ log.cleaner.enable=false zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=6000 +zookeeper.connection.timeout.ms=1000000 diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 8e99de0..4976d9c 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -23,8 +23,6 @@ import java.net._ import java.io._ import java.nio.channels._ -import scala.collection._ - import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.utils._ @@ -43,9 +41,7 @@ class SocketServer(val brokerId: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, val recvBufferSize: Int, - val maxRequestSize: Int = Int.MaxValue, - val maxConnectionsPerIp: Int = Int.MaxValue, - val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]()) extends Logging with KafkaMetricsGroup { + val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @@ -59,23 +55,17 @@ class SocketServer(val brokerId: Int, * Start the socket server */ def startup() { - val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, - time, - maxRequestSize, - aggregateIdleMeter, - newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), - numProcessorThreads, - requestChannel, - quotas) + processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, + newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), + numProcessorThreads, requestChannel) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections - this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) + this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") @@ -97,7 +87,7 @@ class SocketServer(val brokerId: Int, /** * A base class with some helper variables and methods */ -private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { +private[kafka] abstract class AbstractServerThread extends Runnable with Logging { protected val selector = Selector.open(); private val startupLatch = new CountDownLatch(1) @@ -141,48 +131,13 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ */ def wakeup() = selector.wakeup() - /** - * Close the given key and associated socket - */ - def close(key: SelectionKey) { - if(key != null) { - key.attach(null) - close(key.channel.asInstanceOf[SocketChannel]) - swallowError(key.cancel()) - } - } - - def close(channel: SocketChannel) { - if(channel != null) { - debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) - connectionQuotas.dec(channel.socket.getInetAddress) - swallowError(channel.socket().close()) - swallowError(channel.close()) - } - } - - /** - * Close all open connections - */ - def closeAll() { - val iter = this.selector.keys().iterator() - while (iter.hasNext) { - val key = iter.next() - close(key) - } - } - } /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val host: String, - val port: Int, - private val processors: Array[Processor], - val sendBufferSize: Int, - val recvBufferSize: Int, - connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { +private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor], + val sendBufferSize: Int, val recvBufferSize: Int) extends AbstractServerThread { val serverChannel = openServerSocket(host, port) /** @@ -203,14 +158,14 @@ private[kafka] class Acceptor(val host: String, key = iter.next iter.remove() if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length } catch { - case e: Throwable => error("Error while accepting connection", e) + case e: Throwable => error("Error in acceptor", e) } } } @@ -232,7 +187,6 @@ private[kafka] class Acceptor(val host: String, new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) - serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) @@ -248,24 +202,19 @@ private[kafka] class Acceptor(val host: String, */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] + serverSocketChannel.socket().setReceiveBufferSize(recvBufferSize) + val socketChannel = serverSocketChannel.accept() - try { - connectionQuotas.inc(socketChannel.socket().getInetAddress) - socketChannel.configureBlocking(false) - socketChannel.socket().setTcpNoDelay(true) - socketChannel.socket().setSendBufferSize(sendBufferSize) + socketChannel.configureBlocking(false) + socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setSendBufferSize(sendBufferSize) - debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" - .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, + debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" + .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) - processor.accept(socketChannel) - } catch { - case e: TooManyConnectionsException => - info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) - close(socketChannel) - } + processor.accept(socketChannel) } } @@ -280,8 +229,7 @@ private[kafka] class Processor(val id: Int, val aggregateIdleMeter: Meter, val idleMeter: Meter, val totalProcessorThreads: Int, - val requestChannel: RequestChannel, - connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { + val requestChannel: RequestChannel) extends AbstractServerThread { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() @@ -376,6 +324,26 @@ private[kafka] class Processor(val id: Int, } } } + + private def close(key: SelectionKey) { + val channel = key.channel.asInstanceOf[SocketChannel] + debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) + swallowError(channel.socket().close()) + swallowError(channel.close()) + key.attach(null) + swallowError(key.cancel()) + } + + /* + * Close all open connections + */ + private def closeAll() { + val iter = this.selector.keys().iterator() + while (iter.hasNext) { + val key = iter.next() + close(key) + } + } /** * Queue up a new connection for reading @@ -451,31 +419,3 @@ private[kafka] class Processor(val id: Int, private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel] } - -class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { - private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1), entry._2)) - private val counts = mutable.Map[InetAddress, Int]() - - def inc(addr: InetAddress) { - counts synchronized { - val count = counts.getOrElse(addr, 0) - counts.put(addr, count + 1) - val max = overrides.getOrElse(addr, defaultMax) - if(count >= max) - throw new TooManyConnectionsException(addr, max) - } - } - - def dec(addr: InetAddress) { - counts synchronized { - val count = counts.get(addr).get - if(count == 1) - counts.remove(addr) - else - counts.put(addr, count - 1) - } - } - -} - -class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count)) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 50b09ed..bb2e654 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -106,12 +106,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum number of bytes in a socket request */ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) - - /* the maximum number of connections we allow from each ip address */ - val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue)) - - /* per-ip or hostname overrides to the default maximum number of connections */ - val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt)) /*********** Log Configuration ***********/ diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index def1dc2..5a56f57 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -91,8 +91,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.queuedMaxRequests, config.socketSendBufferBytes, config.socketReceiveBufferBytes, - config.socketRequestMaxBytes, - config.maxConnectionsPerIp) + config.socketRequestMaxBytes) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6a56a77..897783c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -36,9 +36,9 @@ object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" } -class ReplicaManager(val config: KafkaConfig, - time: Time, - val zkClient: ZkClient, +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -46,8 +46,6 @@ class ReplicaManager(val config: KafkaConfig, @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] - private var leaderPartitions = new mutable.HashSet[Partition]() - private val leaderPartitionsLock = new Object private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) @@ -60,9 +58,7 @@ class ReplicaManager(val config: KafkaConfig, "LeaderCount", new Gauge[Int] { def value = { - leaderPartitionsLock synchronized { - leaderPartitions.size - } + getLeaderPartitions().size } } ) @@ -82,9 +78,7 @@ class ReplicaManager(val config: KafkaConfig, val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) def underReplicatedPartitionCount(): Int = { - leaderPartitionsLock synchronized { - leaderPartitions.count(_.isUnderReplicated) - } + getLeaderPartitions().count(_.isUnderReplicated) } def startHighWaterMarksCheckPointThread() = { @@ -117,9 +111,6 @@ class ReplicaManager(val config: KafkaConfig, val errorCode = ErrorMapping.NoError getPartition(topic, partitionId) match { case Some(partition) => - leaderPartitionsLock synchronized { - leaderPartitions -= partition - } if(deletePartition) { val removedPartition = allPartitions.remove((topic, partitionId)) if (removedPartition != null) @@ -331,10 +322,6 @@ class ReplicaManager(val config: KafkaConfig, partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} - // Finally add these partitions to the list of partitions for which the leader is the current broker - leaderPartitionsLock synchronized { - leaderPartitions ++= partitionState.keySet - } } catch { case e: Throwable => partitionState.foreach { state => @@ -383,9 +370,6 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { - leaderPartitionsLock synchronized { - leaderPartitions --= partitionState.keySet - } var partitionsToMakeFollower: Set[Partition] = Set() @@ -464,11 +448,7 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - var curLeaderPartitions: List[Partition] = null - leaderPartitionsLock synchronized { - curLeaderPartitions = leaderPartitions.toList - } - curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) + allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { @@ -480,6 +460,9 @@ class ReplicaManager(val config: KafkaConfig, } } + private def getLeaderPartitions() : List[Partition] = { + allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList + } /** * Flushes the highwatermark value for all partitions to the highwatermark file */ diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..5d3c57a 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -203,6 +203,37 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { } @Test + def testAutoCreateAfterDeleteTopic() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + val servers = createTestTopicAndCluster(topic) + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + // test if first produce request after topic deletion auto creates the topic + val props = new Properties() + props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(",")) + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("producer.type", "sync") + props.put("request.required.acks", "1") + props.put("message.send.max.retries", "1") + val producerConfig = new ProducerConfig(props) + val producer = new Producer[String, String](producerConfig) + try { + producer.send(new KeyedMessage[String, String](topic, "test", "test1")) + } catch { + case e: FailedToSendMessageException => fail("Topic should have been auto created") + case oe: Throwable => fail("fails with exception", oe) + } + // test the topic path exists + assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + // wait until leader is elected + val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + servers.foreach(_.shutdown()) + } + + @Test def testDeleteNonExistingTopic() { val topicAndPartition = TopicAndPartition("test", 0) val topic = topicAndPartition.topic diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 3b83a86..1c492de 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -40,8 +40,7 @@ class SocketServerTest extends JUnitSuite { maxQueuedRequests = 50, sendBufferSize = 300000, recvBufferSize = 300000, - maxRequestSize = 50, - maxConnectionsPerIp = 5) + maxRequestSize = 50) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -76,7 +75,7 @@ class SocketServerTest extends JUnitSuite { def cleanup() { server.shutdown() } - + @Test def simpleRequest() { val socket = connect() @@ -140,19 +139,4 @@ class SocketServerTest extends JUnitSuite { // doing a subsequent send should throw an exception as the connection should be closed. sendRequest(socket, 0, bytes) } - - @Test - def testMaxConnectionsPerIp() { - // make the maximum allowable number of connections and then leak them - val conns = (0 until server.maxConnectionsPerIp).map(i => connect()) - - // now try one more (should fail) - try { - val conn = connect() - sendRequest(conn, 100, "hello".getBytes) - assertEquals(-1, conn.getInputStream().read()) - } catch { - case e: IOException => // this is good - } - } }