Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1384474)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -179,33 +179,4 @@
     assertTrue("The last flush time has to be within defaultflushInterval of current time ",
                      (System.currentTimeMillis - log.getLastFlushedTime) < 100)
   }
-
-  @Test
-  def testConfigurablePartitions() {
-    val props = TestUtils.createBrokerConfig(0, -1)
-    logManager.shutdown()
-    config = new KafkaConfig(props) {
-                   override val logFileSize = 256
-                   override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
-                   override val flushInterval = 100
-                 }
-    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
-    logManager.startup
-
-    for(i <- 0 until 1) {
-      val log = logManager.getOrCreateLog(name, i)
-      for(i <- 0 until 250) {
-        var set = TestUtils.singleMessageSet("test".getBytes())
-        log.append(set)
-      }
-    }
-
-    try
-    {
-      val log = logManager.getOrCreateLog(name, 2)
-      assertTrue("Should not come here", log != null)
-    } catch {
-       case _ =>
-    }
-  }
 }
Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/network/SocketServerTest.scala	(revision 1384474)
+++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala	(working copy)
@@ -33,7 +33,6 @@
   val server: SocketServer = new SocketServer(0,
                                               port = TestUtils.choosePort,
                                               numProcessorThreads = 1,
-                                              monitoringPeriodSecs = 30,
                                               maxQueuedRequests = 50,
                                               maxRequestSize = 50)
   server.startup()
Index: core/src/main/scala/kafka/network/SocketServer.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision 1384474)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(working copy)
@@ -34,7 +34,6 @@
 class SocketServer(val brokerId: Int,
                    val port: Int,
                    val numProcessorThreads: Int, 
-                   val monitoringPeriodSecs: Int,
                    val maxQueuedRequests: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1384474)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -64,9 +64,6 @@
   /* the number of queued requests allowed before blocking the network threads */
   val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
 
-  /* the interval in which to measure performance statistics */
-  val monitoringPeriodSecs = props.getIntInRange("monitoring.period.secs", 600, (1, Int.MaxValue))
-  
   /* the default number of log partitions per topic */
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
   
@@ -112,9 +109,6 @@
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
   val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
 
-   /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
-  val topicPartitionsMap = Utils.getTopicPartitions(props.getString("topic.partition.count.map", ""))
-
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
 
@@ -143,9 +137,6 @@
 
   val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
 
-  /* size of the state change request queue in Zookeeper */
-  val stateChangeQSize = props.getInt("state.change.queue.size", 1000)
-
   /**
    * Config options relevant to a follower for a replica
    */
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1384474)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -75,7 +75,6 @@
     socketServer = new SocketServer(config.brokerId,
                                     config.port,
                                     config.numNetworkThreads,
-                                    config.monitoringPeriodSecs,
                                     config.numQueuedRequests,
                                     config.maxSocketRequestSize)
 
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1384474)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -23,7 +23,7 @@
 import kafka.server.KafkaConfig
 import kafka.api.OffsetRequest
 import kafka.log.Log._
-import kafka.common.{KafkaException, UnknownTopicOrPartitionException}
+import kafka.common.KafkaException
 
 /**
  * The guy who creates and hands out logs
@@ -95,12 +95,6 @@
    * Create a log for the given topic and the given partition
    */
   private def createLog(topic: String, partition: Int): Log = {
-    if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
-      val error = "Wrong partition %d, valid partitions (0, %d)."
-              .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
-      warn(error)
-      throw new UnknownTopicOrPartitionException(error)
-    }
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
