Index: branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -18,81 +18,105 @@
 package kafka.server
 
 import java.util.Properties
-import kafka.utils.{Utils, ZKConfig}
+import kafka.utils.{Utils, ZKConfig, Logging}
 import kafka.message.Message
 
 /**
  * Configuration settings for the kafka server
  */
-class KafkaConfig(props: Properties) extends ZKConfig(props) {
+class KafkaConfig(props: Properties) extends ZKConfig(props) with Logging {
+  
   /* the port to listen and accept connections on */
   val port: Int = Utils.getInt(props, "port", 6667)
-
+  logConfig("port", port)
+  
   /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
   val hostName: String = Utils.getString(props, "hostname", null)
+  logConfig("hostname", hostName)
 
   /* the broker id for this server */
   val brokerId: Int = Utils.getInt(props, "brokerid")
+  logConfig("brokerid", brokerId)
   
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
+  logConfig("socket.send.buffer", socketSendBuffer)
   
   /* the SO_RCVBUFF buffer of the socket sever sockets */
   val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
+  logConfig("socket.receive.buffer", socketReceiveBuffer)
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+  logConfig("max.socket.request.bytes", maxSocketRequestSize)
   
   /* the number of network threads that the server uses for handling network requests */
   val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
-
+  logConfig("network.threads", numNetworkThreads)
+  
   /* the number of io threads that the server uses for carrying out network requests */
   val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
+  logConfig("io.threads", numIoThreads)
   
   /* the number of queued requests allowed before blocking the network threads */
   val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
+  logConfig("max.queued.requests", numQueuedRequests)
 
   /* the interval in which to measure performance statistics */
   val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
+  logConfig("monitoring.period.secs", monitoringPeriodSecs)
   
   /* the default number of log partitions per topic */
   val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
+  logConfig("num.paritions", numPartitions)
   
-  /* the directory in which the log data is kept */
+    /* the directory in which the log data is kept */
   val logDir = Utils.getString(props, "log.dir")
+  logConfig("log.dir", logDir)
   
   /* the maximum size of a single log file */
   val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+  logConfig("log.file.size", logFileSize)
   
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
   val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
+  logConfig("log.flush.interval", flushInterval)
   
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+  logConfig("log.retention.hours", logRetentionHours)
   
   /* the maximum size of the log before deleting it */
   val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
+  logConfig("log.retention.size", logRetentionSize)
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
   val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+  logConfig("topic.log.retention.hours", logRetentionHoursMap.mkString("{",", ","}"))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  logConfig("log.cleanup.interval.mins", logCleanupIntervalMinutes)
   
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
+  logConfig("topic.flush.intervals.ms", flushIntervalMap.mkString("{",", ","}"))
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
   val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms",  3000)
+  logConfig("log.default.flush.scheduler.interval.ms", flushSchedulerThreadRate)
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
   val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
+  logConfig("log.default.flush.interval.ms", defaultFlushIntervalMs)
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+  logConfig("topic.partition.count.map", topicPartitionsMap.mkString("{",", ","}"))
 
   /* enable auto creation of topic on the server */
   val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+  logConfig("auto.create.topics", autoCreateTopics)
 
   /**
    * Following properties are relevant to Kafka replication
@@ -100,9 +124,11 @@
 
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
+  logConfig("default.replication.factor", defaultReplicationFactor)
 
   /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
-
+  logConfig("preferred.replica.wait.time", preferredReplicaWaitTime)
+  
  }
Index: branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -55,7 +55,15 @@
 
   private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
-    val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
+
+   val hostName = if (config.hostName == null) {
+    	val hostName = InetAddress.getLocalHost.getHostAddress
+    	logger.info("No hostname configured.  Using hostname: " + hostName + " to register this broker in Zookeeper.")
+    	hostName
+    } else {
+    	config.hostName
+    }
+    
     val creatorId = hostName + "-" + System.currentTimeMillis
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
Index: branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(working copy)
@@ -17,42 +17,53 @@
 
 package kafka.producer
 
-import kafka.utils.Utils
+import kafka.utils.{Utils, Logging}
 import java.util.Properties
 
-class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
+class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared with Logging {
   /** the broker to which the producer sends events */
   val host = Utils.getString(props, "host")
+  logConfig("host", host)
 
   /** the port on which the broker is running */
   val port = Utils.getInt(props, "port")
+  logConfig("port", port)
 }
 
-trait SyncProducerConfigShared {
+trait SyncProducerConfigShared extends Logging {
   val props: Properties
   
   val bufferSize = Utils.getInt(props, "buffer.size", 100*1024)
+  logConfig("buffer.size", bufferSize)
 
   val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
-
+  logConfig("connect.timeout.ms", connectTimeoutMs)
+  
   /** the socket timeout for network requests */
-  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 30000)  
+  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 30000)
+  logConfig("socket.timeout.ms", socketTimeoutMs)
 
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+  logConfig("reconnect.interval", reconnectInterval)
 
   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
+  logConfig("max.message.size", maxMessageSize)
 
   /* the client application sending the producer requests */
   val correlationId = Utils.getInt(props,"producer.request.correlation_id",-1)
+  logConfig("producer.request.correlation_id", correlationId)
 
   /* the client application sending the producer requests */
   val clientId = Utils.getString(props,"producer.request.client_id","")
+  logConfig("producer.request.client_id", clientId)
 
   /* the required_acks of the producer requests */
   val requiredAcks = Utils.getShort(props,"producer.request.required_acks",0)
-
+  logConfig("producer.request.required_acks", requiredAcks)
+  
   /* the ack_timeout of the producer requests */
   val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1)
+  logConfig("producer.request.ack_timeout", ackTimeout)
 }
 
 object SyncProducerConfig {
Index: branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala	(working copy)
@@ -19,36 +19,40 @@
 
 import async.AsyncProducerConfig
 import java.util.Properties
-import kafka.utils.{ZKConfig, Utils}
+import kafka.utils.{ZKConfig, Utils, Logging}
 import kafka.common.InvalidConfigException
 
 class ProducerConfig(val props: Properties) extends ZKConfig(props)
-        with AsyncProducerConfig with SyncProducerConfigShared{
-
+        with AsyncProducerConfig with SyncProducerConfigShared with Logging{
+  
   /** For bypassing zookeeper based auto partition discovery, use this config   *
    *  to pass in static broker and per-broker partition information. Format-    *
    *  brokerid1:host1:port1, brokerid2:host2:port2*/
   val brokerList = Utils.getString(props, "broker.list", null)
   if(brokerList != null)
     throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead")
-
+  logConfig("broker.list", brokerList)
+  
   /** If both broker.list and zk.connect options are specified, throw an exception */
   if(zkConnect == null)
     throw new InvalidConfigException("zk.connect property is required")
 
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
-
+  logConfig("partitioner.class", partitionerClass)
+  
   /** this parameter specifies whether the messages are sent asynchronously *
    * or not. Valid values are - async for asynchronous send                 *
    *                            sync for synchronous send                   */
   val producerType = Utils.getString(props, "producer.type", "sync")
-
+  logConfig("producer.type", producerType)
+  
   /**
    * This parameter allows you to specify the compression codec for all data generated *
    * by this producer. The default is NoCompressionCodec
    */
   val compressionCodec = Utils.getCompressionCodec(props, "compression.codec")
+  logConfig("compression.codec", compressionCodec)
 
   /** This parameter allows you to set whether compression should be turned *
    *  on for particular topics
@@ -62,6 +66,7 @@
    *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
    */
   val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
+  logConfig("compressed.topics", compressedTopics)
 
   /**
    * The producer using the zookeeper software load balancer maintains a ZK cache that gets
@@ -72,6 +77,9 @@
    * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
    */
   val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
+  logConfig("producer.num.retries", producerRetries)
 
   val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 5)
+  logConfig("producer.retry.backoff.ms", producerRetryBackoffMs)
+  
 }
Index: branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/utils/Logging.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/utils/Logging.scala	(working copy)
@@ -23,6 +23,15 @@
   val loggerName = this.getClass.getName
   lazy val logger = Logger.getLogger(loggerName)
 
+  def logConfig(configItem: String, configValue: Any) = {
+    val logString = configValue match {
+      case configValueMap: Map[_,_] => "Configured " + configItem + " to " + configValueMap.mkString("{",", ","}") + "."
+      case configValueString: String if configValueString.length() == 0 => "Configured " + configItem + " to (zero-length string)."
+      case _ => "Configured " + configItem + " to " + configValue + "."
+    }
+    info(logString)
+  }
+  
   def trace(msg: => String): Unit = {
     if (logger.isTraceEnabled())
       logger.trace(msg)	
Index: branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
===================================================================
--- branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(revision 1325394)
+++ branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 import java.util.Properties
-import kafka.utils.{ZKConfig, Utils}
+import kafka.utils.{ZKConfig, Utils, Logging}
 import kafka.api.OffsetRequest
 import kafka.common.InvalidConfigException
 object ConsumerConfig {
@@ -42,67 +42,83 @@
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
 }
 
-class ConsumerConfig(props: Properties) extends ZKConfig(props) {
+class ConsumerConfig(props: Properties) extends ZKConfig(props) with Logging {
   import ConsumerConfig._
-
+  
   /** a string that uniquely identifies a set of consumers within the same consumer group */
   val groupId = Utils.getString(props, "groupid")
-
+  logConfig("groupid",groupId)
+  
   /** consumer id: generated automatically if not set.
    *  Set this explicitly for only testing purpose. */
   val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null))
-
+  logConfig("consumerid", consumerId)
+  
   /** the socket timeout for network requests */
   val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
+  logConfig("socket.timeout.ms",socketTimeoutMs)
   
   /** the socket receive buffer for network requests */
   val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
+  logConfig("socket.buffersize",socketBufferSize)
   
   /** the number of byes of messages to attempt to fetch */
   val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
+  logConfig("fetch.size", fetchSize)
   
   /** to avoid repeatedly polling a broker node which has no new data
       we will backoff every time we get an empty set from the broker*/
   val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs)
+  logConfig("fetcher.backoff.ms", fetcherBackoffMs)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
   val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
+  logConfig("autocommit.enable", autoCommit)
   
   /** the frequency in ms that the consumer offsets are committed to zookeeper */
   val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
-
+  logConfig("autocommit.interval.ms", autoCommitIntervalMs)
+  
   /** max number of messages buffered for consumption */
   val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
-
+  logConfig("queuedchunks.max", maxQueuedChunks)
+  
   /** max number of retries during rebalance */
   val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
+  logConfig("rebalance.retries.max", maxRebalanceRetries)
 
   /** backoff time between retries during rebalance */
   val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
+  logConfig("rebalance.backoff.ms", rebalanceBackoffMs)
 
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset
      anything else: throw exception to the consumer */
   val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
+  logConfig("autooffset.reset", autoOffsetReset)
 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
+  logConfig("consumer.timeout.ms", consumerTimeoutMs)
 
   /** Whitelist of topics for this mirror's embedded consumer to consume. At
    *  most one of whitelist/blacklist may be specified. */
   val mirrorTopicsWhitelist = Utils.getString(
     props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist)
- 
+  logConfig(MirrorTopicsWhitelistProp, mirrorTopicsWhitelist)
+    
   /** Topics to skip mirroring. At most one of whitelist/blacklist may be
    *  specified */
   val mirrorTopicsBlackList = Utils.getString(
     props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist)
-
+  logConfig(MirrorTopicsBlacklistProp, mirrorTopicsBlackList)
+    
   if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty)
       throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist")
 
   val mirrorConsumerNumThreads = Utils.getInt(
     props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
+  logConfig(MirrorConsumerNumThreadsProp, mirrorConsumerNumThreads)
 }
 
