Index: branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -18,81 +18,104 @@
 package kafka.server
 
 import java.util.Properties
-import kafka.utils.{Utils, ZKConfig}
+import kafka.utils.{Utils, ZKConfig, Logging}
 import kafka.message.Message
 
 /**
  * Configuration settings for the kafka server
  */
-class KafkaConfig(props: Properties) extends ZKConfig(props) {
+class KafkaConfig(props: Properties) extends ZKConfig(props) with Logging {
   /* the port to listen and accept connections on */
   val port: Int = Utils.getInt(props, "port", 6667)
-
+  info("Configured port to " + port + ".")
+  
   /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
   val hostName: String = Utils.getString(props, "hostname", null)
+  info("Configured hostname " + ( if (hostName != null) hostName else "(null)" ) + ".  Null may be overridden by InetAddress.getLocalHost.getHostAddress when connecting to zookeeper.")
 
   /* the broker id for this server */
   val brokerId: Int = Utils.getInt(props, "brokerid")
+  info("Configured brokerid to " + brokerId + ".")
   
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
+  info("Configured socket.send.buffer to " + socketSendBuffer + ".")
   
   /* the SO_RCVBUFF buffer of the socket sever sockets */
   val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
+  info("Configured socket.receive.buffer to " + socketReceiveBuffer + ".")
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+  info("Configured max.socket.request.bytes to " + maxSocketRequestSize + ".")
   
   /* the number of network threads that the server uses for handling network requests */
   val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
-
+  info("Configured network.threads to " + numNetworkThreads + ".")
+  
   /* the number of io threads that the server uses for carrying out network requests */
   val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
+  info("Configured io.threads to " + numIoThreads + ".")
   
   /* the number of queued requests allowed before blocking the network threads */
   val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
+  info("Configured max.queued.requests to " + numQueuedRequests + ".")
 
   /* the interval in which to measure performance statistics */
   val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
+  info("Configured monitoring.period.secs to " + monitoringPeriodSecs + ".")
   
   /* the default number of log partitions per topic */
   val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
+  info("Configured num.paritions to " + numPartitions + ".")
   
-  /* the directory in which the log data is kept */
+    /* the directory in which the log data is kept */
   val logDir = Utils.getString(props, "log.dir")
+  info("Configured log.dir to " + logDir + ".")
   
   /* the maximum size of a single log file */
   val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+  info("Configured log.file.size to " + logFileSize + ".")
   
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
   val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+  info("Configured log.flush.interval to " + flushInterval + ".")
   
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+  info("Configured log.retention.hours to " + logRetentionHours + ".")
   
   /* the maximum size of the log before deleting it */
   val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
+  info("Configured log.retention.size to " + logRetentionSize + ".")
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+  info("Configured topic.log.retention.hours to " + logRetentionHoursMap.mkString("{",", ","}") + ".")
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  info("Configured log.cleanup.interval.mins to " + logCleanupIntervalMinutes + ".")
   
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
+  info("Configured topic.flush.intervals.ms to " + flushIntervalMap.mkString("{",", ","}") + ".")
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
   val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms",  3000)
+  info("Configured log.default.flush.scheduler.interval.ms to " + flushSchedulerThreadRate + ".")
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
   val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
+  info("Configured log.default.flush.interval.ms to " + defaultFlushIntervalMs + ".")
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+  info("Configured topic.partition.count.map to " + topicPartitionsMap.mkString("{",", ","}") + ".")
 
   /* enable auto creation of topic on the server */
   val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+  info("Configured auto.create.topics to " + autoCreateTopics + ".")
 
   /**
    * Following properties are relevant to Kafka replication
@@ -100,9 +123,11 @@
 
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
+  info("Configured default.replication.factor to " + defaultReplicationFactor + ".")
 
   /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
-
+  info("Configured preferred.replica.wait.time to " + preferredReplicaWaitTime + ".")
+  
  }
Index: branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -55,7 +55,15 @@
 
   private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
-    val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
+
+   val hostName = if (config.hostName == null) {
+    	val hostName = InetAddress.getLocalHost.getHostAddress
+    	logger.info("No hostname configured.  Using hostname: " + hostName + " to register this broker in Zookeeper.")
+    	hostName
+    } else {
+    	config.hostName
+    }
+    
     val creatorId = hostName + "-" + System.currentTimeMillis
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
