Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.7.2, 0.8.0
    • Component/s: None
    • Labels:

      Description

      Currently, it's hard to figure out which configuration value is being used and whether a new configuration is being picked up. Logging all configuration values during startup time can address this issue. We should cover broker, producer and consumer.

      1. KAFKA-246-broker-0.8.patch
        8 kB
        Edward Smith
      2. KAFKA-246-all-0.8.patch
        19 kB
        Edward Smith

        Activity

        Jun Rao made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Fix Version/s 0.7.2 [ 12322475 ]
        Resolution Duplicate [ 3 ]
        Edward Smith made changes -
        Attachment KAFKA-246-all-0.8.patch [ 12522589 ]
        Edward Smith made changes -
        Comment [ I assumed that JIRA was going to ask me to upload a patch file, but it didn't. So here it is:


        Index: branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
        ===================================================================
        --- branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1325394)
        +++ branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (working copy)
        @@ -18,81 +18,104 @@
         package kafka.server
         
         import java.util.Properties
        -import kafka.utils.{Utils, ZKConfig}
        +import kafka.utils.{Utils, ZKConfig, Logging}
         import kafka.message.Message
         
         /**
          * Configuration settings for the kafka server
          */
        -class KafkaConfig(props: Properties) extends ZKConfig(props) {
        +class KafkaConfig(props: Properties) extends ZKConfig(props) with Logging {
           /* the port to listen and accept connections on */
           val port: Int = Utils.getInt(props, "port", 6667)
        -
        + info("Configured port to " + port + ".")
        +
           /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
           val hostName: String = Utils.getString(props, "hostname", null)
        + info("Configured hostname " + ( if (hostName != null) hostName else "(null)" ) + ". Null may be overridden by InetAddress.getLocalHost.getHostAddress when connecting to zookeeper.")
         
           /* the broker id for this server */
           val brokerId: Int = Utils.getInt(props, "brokerid")
        + info("Configured brokerid to " + brokerId + ".")
           
           /* the SO_SNDBUFF buffer of the socket sever sockets */
           val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
        + info("Configured socket.send.buffer to " + socketSendBuffer + ".")
           
           /* the SO_RCVBUFF buffer of the socket sever sockets */
           val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)
        + info("Configured socket.receive.buffer to " + socketReceiveBuffer + ".")
           
           /* the maximum number of bytes in a socket request */
           val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
        + info("Configured max.socket.request.bytes to " + maxSocketRequestSize + ".")
           
           /* the number of network threads that the server uses for handling network requests */
           val numNetworkThreads = Utils.getIntInRange(props, "network.threads", 3, (1, Int.MaxValue))
        -
        + info("Configured network.threads to " + numNetworkThreads + ".")
        +
           /* the number of io threads that the server uses for carrying out network requests */
           val numIoThreads = Utils.getIntInRange(props, "io.threads", 8, (1, Int.MaxValue))
        + info("Configured io.threads to " + numIoThreads + ".")
           
           /* the number of queued requests allowed before blocking the network threads */
           val numQueuedRequests = Utils.getIntInRange(props, "max.queued.requests", 500, (1, Int.MaxValue))
        + info("Configured max.queued.requests to " + numQueuedRequests + ".")
         
           /* the interval in which to measure performance statistics */
           val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))
        + info("Configured monitoring.period.secs to " + monitoringPeriodSecs + ".")
           
           /* the default number of log partitions per topic */
           val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))
        + info("Configured num.paritions to " + numPartitions + ".")
           
        - /* the directory in which the log data is kept */
        + /* the directory in which the log data is kept */
           val logDir = Utils.getString(props, "log.dir")
        + info("Configured log.dir to " + logDir + ".")
           
           /* the maximum size of a single log file */
           val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
        + info("Configured log.file.size to " + logFileSize + ".")
           
           /* the number of messages accumulated on a log partition before messages are flushed to disk */
           val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
        + info("Configured log.flush.interval to " + flushInterval + ".")
           
           /* the number of hours to keep a log file before deleting it */
           val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
        + info("Configured log.retention.hours to " + logRetentionHours + ".")
           
           /* the maximum size of the log before deleting it */
           val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
        + info("Configured log.retention.size to " + logRetentionSize + ".")
         
           /* the number of hours to keep a log file before deleting it for some specific topic*/
           val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
        + info("Configured topic.log.retention.hours to " + logRetentionHoursMap.mkString("{",", ","}") + ".")
         
           /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
           val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
        + info("Configured log.cleanup.interval.mins to " + logCleanupIntervalMinutes + ".")
           
           /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
           val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
        + info("Configured topic.flush.intervals.ms to " + flushIntervalMap.mkString("{",", ","}") + ".")
         
           /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
           val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms", 3000)
        + info("Configured log.default.flush.scheduler.interval.ms to " + flushSchedulerThreadRate + ".")
         
           /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
           val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)
        + info("Configured log.default.flush.interval.ms to " + defaultFlushIntervalMs + ".")
         
            /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
           val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
        + info("Configured topic.partition.count.map to " + topicPartitionsMap.mkString("{",", ","}") + ".")
         
           /* enable auto creation of topic on the server */
           val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
        + info("Configured auto.create.topics to " + autoCreateTopics + ".")
         
           /**
            * Following properties are relevant to Kafka replication
        @@ -100,9 +123,11 @@
         
           /* default replication factors for automatically created topics */
           val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
        + info("Configured default.replication.factor to " + defaultReplicationFactor + ".")
         
           /* wait time in ms to allow the preferred replica for a partition to become the leader. This property is used during
           * leader election on all replicas minus the preferred replica */
           val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
        -
        + info("Configured preferred.replica.wait.time to " + preferredReplicaWaitTime + ".")
        +
          }
        Index: branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
        ===================================================================
        --- branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (revision 1325394)
        +++ branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (working copy)
        @@ -55,7 +55,15 @@
         
           private def registerBrokerInZk() {
             info("Registering broker " + brokerIdPath)
        - val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
        +
        + val hostName = if (config.hostName == null) {
        + val hostName = InetAddress.getLocalHost.getHostAddress
        + logger.info("No hostname configured. Using hostname: " + hostName + " to register this broker in Zookeeper.")
        + hostName
        + } else {
        + config.hostName
        + }
        +
             val creatorId = hostName + "-" + System.currentTimeMillis
             ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
           }
        ]
        Edward Smith made changes -
        Attachment KAFKA-246-broker-0.8.patch [ 12522492 ]
        Edward Smith made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Affects Version/s 0.8 [ 12317244 ]
        Neha Narkhede made changes -
        Field Original Value New Value
        Labels newbie
        Jun Rao created issue -

          People

          • Assignee:
            Unassigned
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development