From 76cd5f5802a80779f5a12ebb8664ae9f19753a84 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Wed, 2 Jan 2013 17:06:33 -0800
Subject: [PATCH 1/6]  Committer: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>

 On branch 0.8
 Your branch is behind 'origin/0.8' by 3 commits, and can be fast-forwarded.

 Changes to be committed:
   (use "git reset HEAD <file>..." to unstage)

	modified:   core/src/main/scala/kafka/consumer/ConsumerConfig.scala
	modified:   core/src/main/scala/kafka/log/LogManager.scala
	modified:   core/src/main/scala/kafka/producer/ProducerConfig.scala
	modified:   core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
	modified:   core/src/main/scala/kafka/server/KafkaConfig.scala
	modified:   core/src/main/scala/kafka/server/KafkaServer.scala
	modified:   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
	modified:   core/src/main/scala/kafka/server/ReplicaManager.scala
	modified:   core/src/test/scala/unit/kafka/log/LogManagerTest.scala
	modified:   core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
	modified:   core/src/test/scala/unit/kafka/producer/ProducerTest.scala
	modified:   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
	modified:   core/src/test/scala/unit/kafka/utils/TestUtils.scala
---
 .../main/scala/kafka/consumer/ConsumerConfig.scala |   24 ++++++++++----------
 core/src/main/scala/kafka/log/LogManager.scala     |   20 ++++++++--------
 .../main/scala/kafka/producer/ProducerConfig.scala |    2 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    4 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |    3 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    4 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    4 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   14 +++++-----
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |    2 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |    2 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |    6 ++--
 13 files changed, 45 insertions(+), 44 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index b379c9d..219df74 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -52,11 +52,11 @@ object ConsumerConfig extends Config {
   }
 
   def validateClientId(clientId: String) {
-    validateChars("clientid", clientId)
+    validateChars("client.id", clientId)
   }
 
   def validateGroupId(groupId: String) {
-    validateChars("groupid", groupId)
+    validateChars("group.id", groupId)
   }
 
   def validateAutoOffsetReset(autoOffsetReset: String) {
@@ -77,32 +77,32 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   }
 
   /** a string that uniquely identifies a set of consumers within the same consumer group */
-  val groupId = props.getString("groupid")
+  val groupId = props.getString("group.id")
 
   /** consumer id: generated automatically if not set.
    *  Set this explicitly for only testing purpose. */
-  val consumerId: Option[String] = Option(props.getString("consumerid", null))
+  val consumerId: Option[String] = Option(props.getString("consumer.id", null))
 
   /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
   val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
   
   /** the socket receive buffer for network requests */
-  val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
+  val socketBufferSize = props.getInt("socket.buffer.size", SocketBufferSize)
   
   /** the number of byes of messages to attempt to fetch */
   val fetchSize = props.getInt("fetch.size", FetchSize)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
-  val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
+  val autoCommit = props.getBoolean("auto.commit.enable", AutoCommit)
   
   /** the frequency in ms that the consumer offsets are committed to zookeeper */
-  val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
+  val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
 
   /** max number of messages buffered for consumption */
-  val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
+  val maxQueuedChunks = props.getInt("max.queued.chunks", MaxQueuedChunks)
 
   /** max number of retries during rebalance */
-  val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
+  val maxRebalanceRetries = props.getInt("max.rebalance.retries", MaxRebalanceRetries)
   
   /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
   val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
@@ -120,7 +120,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset
      anything else: throw exception to the consumer */
-  val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
+  val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
@@ -129,12 +129,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
    *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
    *  overhead of decompression.
    *  */
-  val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
+  val enableShallowIterator = props.getBoolean("enable.shallow.iterator", false)
 
   /**
    * Client id is specified by the kafka consumer client, used to distinguish different clients
    */
-  val clientId = props.getString("clientid", groupId)
+  val clientId = props.getString("client.id", groupId)
 
   validate(this)
 }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 5f0148c..c85f869 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -43,15 +43,15 @@ private[kafka] class LogManager(val config: KafkaConfig,
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
   val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
-  private val logFileSizeMap = config.logFileSizeMap
-  private val logFlushInterval = config.flushInterval
-  private val logFlushIntervals = config.flushIntervalMap
+  private val logFileSizeMap = config.logTopicFileSizeMap
+  private val logFlushInterval = config.logFlushInterval
+  private val logFlushIntervals = config.logTopicFlushIntervalMsMap
   private val logCreationLock = new Object
-  private val logRetentionSizeMap = config.logRetentionSizeMap
-  private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
-  private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
+  private val logRetentionSizeMap = config.logTopicRetentionSizeMap
+  private val logRetentionMsMap = config.logTopicRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
+  private val logRollMsMap = config.logTopicRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
   private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
-  private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+  private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
   private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
 
   this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
@@ -139,10 +139,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
     if(scheduler != null) {
       info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
       scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
-      info("Starting log flusher every " + config.flushSchedulerThreadRate +
+      info("Starting log flusher every " + config.logFlushSchedulerIntervalMs +
                    " ms with the following overrides " + logFlushIntervals)
       scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
-                                 config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+                                 config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false)
     }
   }
   
@@ -310,7 +310,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
     for (log <- allLogs) {
       try {
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
-        var logFlushInterval = config.defaultFlushIntervalMs
+        var logFlushInterval = config.logDefaultFlushIntervalMs
         if(logFlushIntervals.contains(log.topicName))
           logFlushInterval = logFlushIntervals(log.topicName)
         debug(log.topicName + " flush interval  " + logFlushInterval +
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index e559187..afab5d9 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -109,7 +109,7 @@ class ProducerConfig private (val props: VerifiableProperties)
    * ZK cache needs to be updated.
    * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
    */
-  val producerRetries = props.getInt("producer.num.retries", 3)
+  val producerRetryCount = props.getInt("producer.retry.count", 3)
 
   val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
 
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 4f04862..404c466 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -56,7 +56,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
-      var remainingRetries = config.producerRetries + 1
+      var remainingRetries = config.producerRetryCount + 1
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
         if (outstandingProduceRequests.size > 0)  {
@@ -71,7 +71,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       if(outstandingProduceRequests.size > 0) {
         producerStats.failedSendRate.mark()
         error("Failed to send the following requests: " + outstandingProduceRequests)
-        throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
+        throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetryCount + " tries.", null)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 962b65f..8dd3a4d 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
+import java.net.InetAddress
 import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
 
 /**
@@ -57,7 +58,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /* hostname of broker. If this is set, it will only bind to this address.  If this is not set,
    * it will bind to all interfaces, and publish one to ZK */
-  val hostName: String = props.getString("hostname", null)
+  val hostName: String = props.getString("host.name", null)
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ae35e4f..96dbe91 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -65,8 +65,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                     config.hostName,
                                     config.port,
                                     config.numNetworkThreads,
-                                    config.numQueuedRequests,
-                                    config.maxSocketRequestSize)
+                                    config.maxQueuedRequests,
+                                    config.maxSocketRequestBytes)
 
     socketServer.startup
 
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c1d3235..3b855b5 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -34,8 +34,8 @@ class ReplicaFetcherThread(name:String,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,
                                 fetchSize = brokerConfig.replicaFetchSize,
                                 fetcherBrokerId = brokerConfig.brokerId,
-                                maxWait = brokerConfig.replicaMaxWaitTimeMs,
-                                minBytes = brokerConfig.replicaMinBytes) {
+                                maxWait = brokerConfig.replicaFetchMaxWaitTimeMs,
+                                minBytes = brokerConfig.replicaFetchMinExpectedBytes) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42068ca..beef5b1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
-      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
+      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b06d812..c250312 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -41,7 +41,7 @@ class LogManagerTest extends JUnit3Suite {
     super.setUp()
     config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
                    override val logFileSize = 1024
-                   override val flushInterval = 10000
+                   override val logFlushInterval = 10000
                    override val logRetentionHours = maxLogAgeHours
                  }
     scheduler.startup
@@ -117,7 +117,7 @@ class LogManagerTest extends JUnit3Suite {
       override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
-      override val flushInterval = 100
+      override val logFlushInterval = 100
       override val logRollHours = maxRollInterval
     }
     logManager = new LogManager(config, scheduler, time)
@@ -159,10 +159,10 @@ class LogManagerTest extends JUnit3Suite {
     logManager.shutdown()
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024 *1024 *1024
-                   override val flushSchedulerThreadRate = 50
-                   override val flushInterval = Int.MaxValue
+                   override val logFlushSchedulerIntervalMs = 50
+                   override val logFlushInterval = Int.MaxValue
                    override val logRollHours = maxRollInterval
-                   override val flushIntervalMap = Map("timebasedflush" -> 100)
+                   override val logTopicFlushIntervalMsMap = Map("timebasedflush" -> 100)
                  }
     logManager = new LogManager(config, scheduler, time)
     logManager.startup
@@ -173,7 +173,7 @@ class LogManagerTest extends JUnit3Suite {
     }
     val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
     assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
-                     ellapsed < 2*config.flushSchedulerThreadRate)
+                     ellapsed < 2*config.logFlushSchedulerIntervalMs)
   }
   
   @Test
@@ -183,7 +183,7 @@ class LogManagerTest extends JUnit3Suite {
     val dirs = Seq(TestUtils.tempDir().getAbsolutePath, 
                    TestUtils.tempDir().getAbsolutePath, 
                    TestUtils.tempDir().getAbsolutePath)
-    props.put("log.directories", dirs.mkString(","))
+    props.put("log.dirs", dirs.mkString(","))
     logManager.shutdown()
     logManager = new LogManager(new KafkaConfig(props), scheduler, time)
     
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index c6ea3b6..dff6b24 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -198,7 +198,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
     val props = new Properties
-    props.put("brokerid", nodeId.toString)
+    props.put("broker.id", nodeId.toString)
     props.put("port", port.toString)
     props.put("log.dir", getLogDir.getAbsolutePath)
     props.put("log.flush.interval", "1")
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 48842eb..34ef470 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -300,7 +300,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     // make sure we don't wait fewer than numRetries*timeoutMs milliseconds
     // we do this because the DefaultEventHandler retries a number of times
-    assertTrue((t2-t1) >= timeoutMs*config.producerRetries)
+    assertTrue((t2-t1) >= timeoutMs*config.producerRetryCount)
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index a3afa2d..dfdb0c3 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -30,7 +30,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
     override val replicaMaxLagTimeMs = 5000L
     override val replicaMaxLagBytes = 10L
-    override val replicaMinBytes = 20
+    override val replicaFetchMinExpectedBytes = 20
   })
   val topic = "new-topic"
   val partitionId = 0
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a508895..ee4981b 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -123,7 +123,7 @@ object TestUtils extends Logging {
    */
   def createBrokerConfig(nodeId: Int, port: Int): Properties = {
     val props = new Properties
-    props.put("brokerid", nodeId.toString)
+    props.put("broker.id", nodeId.toString)
     props.put("hostname", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
@@ -140,8 +140,8 @@ object TestUtils extends Logging {
                                consumerTimeout: Long = -1): Properties = {
     val props = new Properties
     props.put("zk.connect", zkConnect)
-    props.put("groupid", groupId)
-    props.put("consumerid", consumerId)
+    props.put("group.id", groupId)
+    props.put("consumer.id", consumerId)
     props.put("consumer.timeout.ms", consumerTimeout.toString)
     props.put("zk.sessiontimeout.ms", "400")
     props.put("zk.synctime.ms", "200")
-- 
1.7.1


From 9cfe37cc42d274894a8776f1568be34bece611d2 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Wed, 2 Jan 2013 18:06:28 -0800
Subject: [PATCH 2/6]  Committer: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>

 On branch 0.8
 Your branch and 'origin/0.8' have diverged,
 and have 1 and 3 different commit(s) each, respectively.

 Changes to be committed:
   (use "git reset HEAD <file>..." to unstage)

	modified:   config/consumer.properties
	modified:   config/server.properties
---
 config/consumer.properties |    2 +-
 config/server.properties   |    6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/config/consumer.properties b/config/consumer.properties
index a067ac0..1c43bf9 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -23,7 +23,7 @@ zk.connect=127.0.0.1:2181
 zk.connectiontimeout.ms=1000000
 
 #consumer group id
-groupid=test-consumer-group
+group.id=test-consumer-group
 
 #consumer timeout
 #consumer.timeout.ms=5000
diff --git a/config/server.properties b/config/server.properties
index f4521fb..4e14089 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -17,7 +17,7 @@
 ############################# Server Basics #############################
 
 # The id of the broker. This must be set to a unique integer for each broker.
-brokerid=0
+broker.id=0
 
 ############################# Socket Server Settings #############################
 
@@ -27,7 +27,7 @@ port=9092
 # Hostname the broker will bind to and advertise to producers and consumers.
 # If not set, the server will bind to all interfaces and advertise the value returned from
 # from java.net.InetAddress.getCanonicalHostName().
-#hostname=localhost
+#host.name=localhost
 
 # The number of threads handling network requests
 network.threads=2
@@ -78,7 +78,7 @@ log.default.flush.interval.ms=1000
 #topic.flush.intervals.ms=topic1:1000, topic2:3000
 
 # The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
+log.default.flush.interval.ms=1000
 
 ############################# Log Retention Policy #############################
 
-- 
1.7.1


From 1f0d1490aeb9f8bcce92209b05158af6cf850847 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 3 Jan 2013 10:12:30 -0800
Subject: [PATCH 3/6]  Committer: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>

 On branch 0.8
 Your branch is ahead of 'origin/0.8' by 2 commits.

 Changes to be committed:
   (use "git reset HEAD <file>..." to unstage)

	modified:   core/src/main/scala/kafka/server/KafkaConfig.scala
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   49 ++++++++++----------
 1 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8dd3a4d..c85f038 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,28 +37,27 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /*********** General Configuration ***********/
   
   /* the broker id for this server */
-  val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
+  val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
 
   /* the maximum size of message that the server can receive */
   val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
   
   /* the number of network threads that the server uses for handling network requests */
-  val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
+  val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
 
   /* the number of io threads that the server uses for carrying out network requests */
-  val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+  val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
   
   /* the number of queued requests allowed before blocking the network threads */
-  val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+  val maxQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
   
   /*********** Socket Server Configuration ***********/
   
   /* the port to listen and accept connections on */
   val port: Int = props.getInt("port", 6667)
 
-  /* hostname of broker. If this is set, it will only bind to this address.  If this is not set,
-   * it will bind to all interfaces, and publish one to ZK */
-  val hostName: String = props.getString("host.name", null)
+  /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
+  val hostName: String = props.getString("host.name", InetAddress.getLocalHost.getHostAddress)
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
@@ -67,7 +66,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
   
   /* the maximum number of bytes in a socket request */
-  val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+  val maxSocketRequestBytes: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
   
   /*********** Log Configuration ***********/
 
@@ -75,53 +74,53 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
   
   /* the directories in which the log data is kept */
-  val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", "")))
+  val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "")))
   require(logDirs.size > 0)
   
   /* the maximum size of a single log file */
   val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum size of a single log file for some specific topic */
-  val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt)
+  val logTopicFileSizeMap = props.getMap("log.topic.file.size", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum time before a new log segment is rolled out */
   val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours before rolling out a new log segment for some specific topic */
-  val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt)  
+  val logTopicRollHoursMap = props.getMap("log.topic.roll.hours", _.toInt > 0).mapValues(_.toInt)
 
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt)
+  val logTopicRetentionHoursMap = props.getMap("log.topic.retention.hours", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum size of the log before deleting it */
   val logRetentionSize = props.getLong("log.retention.size", -1)
 
   /* the maximum size of the log for some specific topic before deleting it */
-  val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong)
+  val logTopicRetentionSizeMap = props.getMap("log.topic.retention.size", _.toLong > 0).mapValues(_.toLong)
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
-  val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
   
   /* the maximum size in bytes of the offset index */
-  val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size", 10*1024*1024, (4, Int.MaxValue))
+  val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size.bytes", 10*1024*1024, (4, Int.MaxValue))
   
   /* the interval with which we add an entry to the offset index */
   val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
-  val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
+  val logFlushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-  val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
+  val logTopicFlushIntervalMsMap = props.getMap("log.topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
-  val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms",  3000)
+  val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms",  3000)
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
-  val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
+  val logDefaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", logFlushSchedulerIntervalMs)
 
   /* enable auto creation of topic on the server */
   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
@@ -151,22 +150,22 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
 
   /* max wait time for each fetcher request issued by follower replicas*/
-  val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
+  val replicaFetchMaxWaitTimeMs = props.getInt("replica.fetch.max.wait.time.ms", 500)
 
   /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
-  val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 1)
+  val replicaFetchMinExpectedBytes = props.getInt("replica.fetch.min.expected.bytes", 1)
 
   /* number of fetcher threads used to replicate messages from a source broker.
    * Increasing this value can increase the degree of I/O parallelism in the follower broker. */
-  val numReplicaFetchers = props.getInt("replica.fetchers", 1)
+  val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
   
   /* the frequency with which the highwater mark is saved out to disk */
-  val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
+  val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
 
   /* the purge interval (in number of requests) of the fetch request purgatory */
-  val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+  val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.request.purgatory.purge.interval", 10000)
 
   /* the purge interval (in number of requests) of the producer request purgatory */
-  val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+  val producerRequestPurgatoryPurgeInterval = props.getInt("producer.request.purgatory.purge.interval", 10000)
 
  }
-- 
1.7.1


From 5140a6879065f9958e5c8e340164e537a619d94e Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 3 Jan 2013 11:03:15 -0800
Subject: [PATCH 4/6]  Committer: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>

 On branch 0.8
 Your branch is ahead of 'origin/0.8' by 3 commits.

 Changes to be committed:
   (use "git reset HEAD <file>..." to unstage)

	modified:   core/src/main/scala/kafka/server/KafkaConfig.scala
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |    5 +++--
 1 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c85f038..7fc6ef5 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -56,8 +56,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the port to listen and accept connections on */
   val port: Int = props.getInt("port", 6667)
 
-  /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
-  val hostName: String = props.getString("host.name", InetAddress.getLocalHost.getHostAddress)
+  /* hostname of broker. If this is set, it will only bind to this address. If this is not set,
+   * it will bind to all interfaces, and publish one to ZK */
+  val hostName: String = props.getString("host.name", null)
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
-- 
1.7.1


From 386047225068ebe955bcaee1030c39c003010fd7 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 3 Jan 2013 11:54:00 -0800
Subject: [PATCH 5/6]  Committer: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>

 On branch 0.8
 Your branch is ahead of 'origin/0.8' by 4 commits.

 Changes to be committed:
   (use "git reset HEAD <file>..." to unstage)

	modified:   core/src/main/scala/kafka/server/KafkaConfig.scala
	modified:   core/src/test/scala/unit/kafka/utils/TestUtils.scala
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |    1 -
 .../test/scala/unit/kafka/utils/TestUtils.scala    |    2 +-
 2 files changed, 1 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7fc6ef5..8fccde5 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,7 +20,6 @@ package kafka.server
 import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
-import java.net.InetAddress
 import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
 
 /**
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ee4981b..dec245a 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -124,7 +124,7 @@ object TestUtils extends Logging {
   def createBrokerConfig(nodeId: Int, port: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
-    props.put("hostname", "localhost")
+    props.put("host.name", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval", "1")
-- 
1.7.1


From 609566bf2861b13d87da4718aec40c6ba6c10b5f Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 3 Jan 2013 13:52:45 -0800
Subject: [PATCH 6/6]  KAFKA-648 Use uniform convention for naming properties keys

 On branch 0.8
 Your branch is ahead of 'origin/0.8' by 5 commits.

 Changes to be committed:
   (use "git reset HEAD <file>..." to unstage)

	modified:   config/server.properties
	modified:   core/src/main/scala/kafka/log/LogManager.scala
	modified:   core/src/main/scala/kafka/server/KafkaConfig.scala
	modified:   core/src/test/scala/unit/kafka/log/LogManagerTest.scala
	modified:   core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
---
 config/server.properties                           |    2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |   14 +++++++-------
 core/src/main/scala/kafka/server/KafkaConfig.scala |   12 ++++++------
 .../test/scala/unit/kafka/log/LogManagerTest.scala |    8 ++++----
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |    2 +-
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/config/server.properties b/config/server.properties
index 4e14089..241e46e 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -95,7 +95,7 @@ log.retention.hours=168
 #log.retention.size=1073741824
 
 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+log.segment.size=536870912
 
 # The interval at which log segments are checked to see if they can be deleted according 
 # to the retention policies
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c85f869..e4c948f 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -43,13 +43,13 @@ private[kafka] class LogManager(val config: KafkaConfig,
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
   val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
-  private val logFileSizeMap = config.logTopicFileSizeMap
+  private val logFileSizeMap = config.logSegmentSizePerTopicMap
   private val logFlushInterval = config.logFlushInterval
-  private val logFlushIntervals = config.logTopicFlushIntervalMsMap
+  private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
   private val logCreationLock = new Object
-  private val logRetentionSizeMap = config.logTopicRetentionSizeMap
-  private val logRetentionMsMap = config.logTopicRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
-  private val logRollMsMap = config.logTopicRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
+  private val logRetentionSizeMap = config.logRetentionSizePerTopicMap
+  private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
+  private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
   private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
   private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
   private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
@@ -111,7 +111,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
             info("Loading log '" + dir.getName + "'")
             val topicPartition = parseTopicPartitionName(dir.getName)
             val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-            val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+            val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentSize)
             val log = new Log(dir, 
                               maxLogFileSize, 
                               config.maxMessageSize, 
@@ -186,7 +186,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
       val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
       dir.mkdirs()
       val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentSize)
       log = new Log(dir, 
                     maxLogFileSize, 
                     config.maxMessageSize, 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8fccde5..12592eb 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -78,28 +78,28 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   require(logDirs.size > 0)
   
   /* the maximum size of a single log file */
-  val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+  val logSegmentSize = props.getIntInRange("log.segment.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum size of a single log file for some specific topic */
-  val logTopicFileSizeMap = props.getMap("log.topic.file.size", _.toInt > 0).mapValues(_.toInt)
+  val logSegmentSizePerTopicMap = props.getMap("log.segment.size.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum time before a new log segment is rolled out */
   val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours before rolling out a new log segment for some specific topic */
-  val logTopicRollHoursMap = props.getMap("log.topic.roll.hours", _.toInt > 0).mapValues(_.toInt)
+  val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logTopicRetentionHoursMap = props.getMap("log.topic.retention.hours", _.toInt > 0).mapValues(_.toInt)
+  val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum size of the log before deleting it */
   val logRetentionSize = props.getLong("log.retention.size", -1)
 
   /* the maximum size of the log for some specific topic before deleting it */
-  val logTopicRetentionSizeMap = props.getMap("log.topic.retention.size", _.toLong > 0).mapValues(_.toLong)
+  val logRetentionSizePerTopicMap = props.getMap("log.retention.size.per.topic", _.toLong > 0).mapValues(_.toLong)
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
@@ -114,7 +114,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val logFlushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-  val logTopicFlushIntervalMsMap = props.getMap("log.topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
+  val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.intervals.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
   val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms",  3000)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index c250312..ac5e38b 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -40,7 +40,7 @@ class LogManagerTest extends JUnit3Suite {
   override def setUp() {
     super.setUp()
     config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
-                   override val logFileSize = 1024
+                   override val logSegmentSize = 1024
                    override val logFlushInterval = 10000
                    override val logRetentionHours = maxLogAgeHours
                  }
@@ -114,7 +114,7 @@ class LogManagerTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-      override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
+      override val logSegmentSize = (10 * (setSize - 1)) // each segment will be 10 messages
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
       override val logFlushInterval = 100
@@ -158,11 +158,11 @@ class LogManagerTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-                   override val logFileSize = 1024 *1024 *1024
+                   override val logSegmentSize = 1024 *1024 *1024
                    override val logFlushSchedulerIntervalMs = 50
                    override val logFlushInterval = Int.MaxValue
                    override val logRollHours = maxRollInterval
-                   override val logTopicFlushIntervalMsMap = Map("timebasedflush" -> 100)
+                   override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100)
                  }
     logManager = new LogManager(config, scheduler, time)
     logManager.startup
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index dff6b24..e33623d 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -206,7 +206,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     props.put("num.partitions", "20")
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
-    props.put("log.file.size", logSize.toString)
+    props.put("log.segment.size", logSize.toString)
     props.put("zk.connect", zkConnect.toString)
     props
   }
-- 
1.7.1

