From 275098b4c76595c974aa915316fa6f3b95463360 Mon Sep 17 00:00:00 2001
From: Andrii Biletskyi <andrii.biletskyi@stealth.ly>
Date: Fri, 23 Jan 2015 10:42:06 +0200
Subject: [PATCH] KAFKA-1786 - Global configuration feature for brokers -
 implementation + RQ/RP messages

---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  5 +-
 .../org/apache/kafka/common/protocol/Protocol.java | 21 +++++
 .../common/requests/admin/ConfigChangeRequest.java | 71 +++++++++++++++++
 .../requests/admin/ConfigChangeResponse.java       | 44 +++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 17 +++-
 core/src/main/scala/kafka/Kafka.scala              | 13 +++-
 core/src/main/scala/kafka/admin/AdminUtils.scala   | 61 ++++++++++++++-
 core/src/main/scala/kafka/api/RequestKeys.scala    |  5 +-
 .../kafka/api/admin/ConfigChangeRequest.scala      | 91 ++++++++++++++++++++++
 .../kafka/api/admin/ConfigChangeResponse.scala     | 46 +++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 22 ++++++
 core/src/main/scala/kafka/server/KafkaConfig.scala | 48 ++++++++----
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  1 +
 .../test/scala/unit/kafka/admin/AdminTest.scala    | 52 +++++++++++++
 .../api/RequestResponseSerializationTest.scala     | 14 +++-
 .../kafka/server/KafkaConfigConfigDefTest.scala    | 26 +++++++
 16 files changed, 515 insertions(+), 22 deletions(-)
 create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeRequest.java
 create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeResponse.java
 create mode 100644 core/src/main/scala/kafka/api/admin/ConfigChangeRequest.scala
 create mode 100644 core/src/main/scala/kafka/api/admin/ConfigChangeResponse.scala

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 109fc96..f561103 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -34,7 +34,8 @@ public enum ApiKeys {
     OFFSET_FETCH(9, "offset_fetch"),
     CONSUMER_METADATA(10, "consumer_metadata"),
     JOIN_GROUP(11, "join_group"),
-    HEARTBEAT(12, "heartbeat");
+    HEARTBEAT(12, "heartbeat"),
+    CONFIG_CHANGE(13, "config_change");
 
     private static ApiKeys[] codeToType;
     public static int MAX_API_KEY = -1;
@@ -63,4 +64,4 @@ public enum ApiKeys {
     public static ApiKeys forId(int id) {
         return codeToType[id];
     }
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..044325e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -362,6 +362,25 @@ public class Protocol {
     public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
     public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
+    /* Config Change api */
+
+    public static Schema CONFIG_CHANGE_PAIR_V0 = new Schema(new Field("config_key",
+                                                                      STRING,
+                                                                     "Settings key."),
+                                                            new Field("config_value",
+                                                                      STRING,
+                                                                     "Settings value."));
+
+    public static Schema CONFIG_CHANGE_REQUEST_V0 = new Schema(new Field("config_data",
+                                                                        new ArrayOf(CONFIG_CHANGE_PAIR_V0),
+                                                                        "Overriding configuration map."));
+
+    public static Schema CONFIG_CHANGE_RESPONSE_V0 = new Schema(new Field("error_code",
+                                                                        INT16));
+
+    public static Schema[] CONFIG_CHANGE_REQUEST = new Schema[] {CONFIG_CHANGE_REQUEST_V0};
+    public static Schema[] CONFIG_CHANGE_RESPONSE = new Schema[] {CONFIG_CHANGE_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions */
     public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -381,6 +400,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+        REQUESTS[ApiKeys.CONFIG_CHANGE.id] = CONFIG_CHANGE_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -393,6 +413,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+        RESPONSES[ApiKeys.CONFIG_CHANGE.id] = CONFIG_CHANGE_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeRequest.java
new file mode 100644
index 0000000..ed9eaab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeRequest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests.admin;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequestResponse;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigChangeRequest extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONFIG_CHANGE.id);
+    private static String CONFIG_DATA_KEY_NAME = "config_data";
+
+    private static String CONFIG_KEY_KEY_NAME = "config_key";
+    private static String CONFIG_VALUE_KEY_NAME = "config_value";
+
+    private final Map<String, String> configData;
+
+    public ConfigChangeRequest(Map<String, String> configData) {
+        super(new Struct(curSchema));
+
+        List<Struct> configPairsArray = new ArrayList<Struct>(configData.size());
+        for (Map.Entry<String, String> configEntry : configData.entrySet()) {
+            Struct configDataStruct = struct.instance(CONFIG_DATA_KEY_NAME);
+            configDataStruct.set(CONFIG_KEY_KEY_NAME, configEntry.getKey());
+            configDataStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.getValue());
+
+            configPairsArray.add(configDataStruct);
+        }
+        struct.set(CONFIG_DATA_KEY_NAME, configPairsArray.toArray());
+        this.configData = configData;
+    }
+
+    public ConfigChangeRequest(Struct struct) {
+        super(struct);
+        configData = new HashMap<String, String>();
+        for (Object configStructElem : struct.getArray(CONFIG_DATA_KEY_NAME)) {
+            Struct configStruct = (Struct) configStructElem;
+            String configKey = configStruct.getString(CONFIG_KEY_KEY_NAME);
+            String configValue = configStruct.getString(CONFIG_VALUE_KEY_NAME);
+
+            configData.put(configKey, configValue);
+        }
+    }
+
+    public Map<String, String> configData() {
+        return configData;
+    }
+
+    public static ConfigChangeRequest parse(ByteBuffer buffer) {
+        return new ConfigChangeRequest(((Struct) curSchema.read(buffer)));
+    }
+}
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeResponse.java
new file mode 100644
index 0000000..ae8d87d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/admin/ConfigChangeResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests.admin;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequestResponse;
+
+import java.nio.ByteBuffer;
+
+public class ConfigChangeResponse extends AbstractRequestResponse {
+    public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONFIG_CHANGE.id);
+
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private final short errorCode;
+
+    public ConfigChangeResponse(short errocCode) {
+        super(new Struct(curSchema));
+        struct.set(ERROR_CODE_KEY_NAME, errocCode);
+        this.errorCode = errocCode;
+    }
+
+    public ConfigChangeResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public static ConfigChangeResponse parse(ByteBuffer buffer) {
+        return new ConfigChangeResponse(((Struct) curSchema.read(buffer)));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index df37fc6..1816965 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,6 +17,8 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.admin.ConfigChangeRequest;
+import org.apache.kafka.common.requests.admin.ConfigChangeResponse;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -52,7 +54,9 @@ public class RequestResponseTest {
                 createOffsetFetchRequest(),
                 createOffsetFetchResponse(),
                 createProduceRequest(),
-                createProduceResponse());
+                createProduceResponse(),
+                createConfigChangeRequest(),
+                createConfigChangeResponse());
 
         for (AbstractRequestResponse req: requestList) {
             ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
@@ -170,4 +174,15 @@ public class RequestResponseTest {
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000));
         return new ProduceResponse(responseData);
     }
+
+    private AbstractRequestResponse createConfigChangeRequest() {
+        Map<String, String> configChange = new HashMap<String, String>();
+        configChange.put("key1", "value1");
+        configChange.put("key2", "");
+        return new ConfigChangeRequest(configChange);
+    }
+
+    private AbstractRequestResponse createConfigChangeResponse() {
+        return new ConfigChangeResponse((short) 0);
+    }
 }
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 37de7df..996164b 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,10 +17,14 @@
 
 package kafka
 
+import kafka.admin.AdminUtils
+import org.I0Itec.zkclient.ZkClient
+
 import scala.collection.JavaConversions._
 import joptsimple.OptionParser
 import metrics.KafkaMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
+import kafka.utils._
 import kafka.utils.{VerifiableProperties, CommandLineUtils, Utils, Logging}
 
 object Kafka extends Logging {
@@ -52,7 +56,12 @@ object Kafka extends Logging {
 
   def main(args: Array[String]): Unit = {
     try {
-      val serverConfig = getKafkaConfigFromArgs(args)
+      val localConfig = getKafkaConfigFromArgs(args)
+      val zkClient = new ZkClient(localConfig.zkConnect, localConfig.zkSessionTimeoutMs,
+        localConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+
+      val serverConfig = localConfig.applyGlobalProperties(AdminUtils.fetchBrokersConfig(zkClient))
+
       KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps))
       val kafkaServerStartable = new KafkaServerStartable(serverConfig)
 
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 28b12c7..f633e5a 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,6 +20,7 @@ package kafka.admin
 import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
+import kafka.server.KafkaConfig
 import kafka.utils.{Logging, ZkUtils, Json}
 import kafka.api.{TopicMetadata, PartitionMetadata}
 
@@ -217,7 +218,7 @@ object AdminUtils extends Logging {
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
-  
+
   /**
    * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
    * @param zkClient: The ZkClient handle used to write the new config to zookeeper
@@ -235,10 +236,30 @@ object AdminUtils extends Logging {
 
     // write the new config--may not exist if there were previously no overrides
     writeTopicConfig(zkClient, topic, configs)
-    
+
     // create the change notification
     zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
   }
+
+  /**
+   * Update the brokers config for kafka cluster. Before writting configuration the properties are checked whether they can be
+   * applied to the current configuration (correct naming, all types / validation checks pass)
+   * NOTE: will take effect only after server restart.
+   * @param zkClient: The ZkClient handle used to write the new config to zookeeper
+   * @param configs: The final set of configs that will be applied to the broker after restart.
+   *
+   */
+  def changeBrokersConfig(zkClient: ZkClient, kafkaConfig: KafkaConfig, configs: Properties) {
+
+    KafkaConfig.validateNames(configs)
+
+    val mergedConfig = fetchBrokersConfig(zkClient)
+    mergedConfig.putAll(configs)
+    // just bootstrap all checks, don't change current configuration
+    kafkaConfig.applyGlobalProperties(mergedConfig)
+
+    writeBrokersConfig(zkClient, mergedConfig)
+  }
   
   /**
    * Write out the topic config to zk, if there is any
@@ -251,6 +272,18 @@ object AdminUtils extends Logging {
     val map = Map("version" -> 1, "config" -> configMap)
     ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
   }
+
+  /**
+   * Write (overwrite) out the broker config to zk, if there is any
+   */
+  private def writeBrokersConfig(zkClient: ZkClient, config: Properties) {
+    val configMap: mutable.Map[String, String] = {
+      import JavaConversions._
+      config
+    }
+    val map = Map("version" -> 1, "config" -> configMap)
+    ZkUtils.updatePersistentPath(zkClient, ZkUtils.BrokersConfigPath, Json.encode(map))
+  }
   
   /**
    * Read the topic config (if any) from zk
@@ -276,6 +309,30 @@ object AdminUtils extends Logging {
     props
   }
 
+  /**
+   * Read the brokers config from zk
+   */
+  def fetchBrokersConfig(zkClient: ZkClient): Properties = {
+    val str: String = zkClient.readData(ZkUtils.BrokersConfigPath, true)
+    val props = new Properties()
+    if(str != null) {
+      Json.parseFull(str) match {
+        case None => // there are no config overrides
+        case Some(map: Map[String, _]) =>
+          require(map("version") == 1)
+          map.get("config") match {
+            case Some(config: Map[String, String]) =>
+              for((k,v) <- config)
+                props.setProperty(k, v)
+            case _ => throw new IllegalArgumentException("Invalid brokers config: " + str)
+          }
+
+        case o => throw new IllegalArgumentException("Unexpected value in config: "  + str)
+      }
+    }
+    props
+  }
+
   def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
     ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
 
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c24c034..ae5f76a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import kafka.api.admin.ConfigChangeRequest
 import kafka.common.KafkaException
 import java.nio.ByteBuffer
 
@@ -34,6 +35,7 @@ object RequestKeys {
   val ConsumerMetadataKey: Short = 10
   val JoinGroupKey: Short = 11
   val HeartbeatKey: Short = 12
+  val ConfigChangeKey: Short = 13
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -48,7 +50,8 @@ object RequestKeys {
         OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
         ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
         JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
-        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
+        HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom),
+        ConfigChangeKey -> ("ConfigChange", ConfigChangeRequest.readFrom)
     )
 
   def nameForKey(key: Short): String = {
diff --git a/core/src/main/scala/kafka/api/admin/ConfigChangeRequest.scala b/core/src/main/scala/kafka/api/admin/ConfigChangeRequest.scala
new file mode 100644
index 0000000..95f00f1
--- /dev/null
+++ b/core/src/main/scala/kafka/api/admin/ConfigChangeRequest.scala
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api.admin
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.api.{RequestKeys, RequestOrResponse}
+import kafka.common.ErrorMapping
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+
+object ConfigChangeRequest {
+  val CurrentVersion = 0.shortValue
+  val DefaultCorrelationId = 0
+
+  def readFrom(buffer: ByteBuffer): ConfigChangeRequest = {
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+
+    val configElemsCount = buffer.getInt
+    val configChangeData =
+      List.fill(configElemsCount) {
+        val configKey = readShortString(buffer)
+        val configValue = readShortString(buffer)
+        (configKey, configValue)
+      }.toMap
+    ConfigChangeRequest(versionId, correlationId, clientId, configChangeData)
+  }
+}
+
+case class ConfigChangeRequest(versionId: Short = ConfigChangeRequest.CurrentVersion,
+                               correlationId: Int = ConfigChangeRequest.DefaultCorrelationId,
+                               clientId: String,
+                               configChangeData: Map[String, String]) extends RequestOrResponse(Some(RequestKeys.ConfigChangeKey)) {
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    buffer.putInt(configChangeData.size)
+    configChangeData.foreach {
+      case (key, value) =>
+        writeShortString(buffer, key)
+        writeShortString(buffer, value)
+    }
+  }
+
+  def sizeInBytes: Int = {
+    2 + /* versionId */
+      4 + /* correlationId */
+      shortStringLength(clientId) +
+      4 + /* configChangeData map size */
+      configChangeData.map { case (k, v) => shortStringLength(k) + shortStringLength(v)}.sum /* configChangeData */
+  }
+
+  override def toString: String = {
+    describe(true)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponse = ConfigChangeResponse(ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), correlationId)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  override def describe(details: Boolean): String = {
+    val fetchRequest = new StringBuilder
+    fetchRequest.append("Name: " + this.getClass.getSimpleName)
+    fetchRequest.append("; Version: " + versionId)
+    fetchRequest.append("; CorrelationId: " + correlationId)
+    fetchRequest.append("; ClientId: " + clientId)
+    if (details)
+      fetchRequest.append("; ConfigChangeData: " + configChangeData.mkString(","))
+    fetchRequest.toString()
+  }
+
+}
diff --git a/core/src/main/scala/kafka/api/admin/ConfigChangeResponse.scala b/core/src/main/scala/kafka/api/admin/ConfigChangeResponse.scala
new file mode 100644
index 0000000..adef555
--- /dev/null
+++ b/core/src/main/scala/kafka/api/admin/ConfigChangeResponse.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api.admin
+
+import java.nio.ByteBuffer
+
+import kafka.api.RequestOrResponse
+
+object ConfigChangeResponse {
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+
+    ConfigChangeResponse(errorCode, correlationId)
+  }
+}
+
+case class ConfigChangeResponse(errorCode: Short, correlationId: Int = 0)
+  extends RequestOrResponse() {
+
+
+  def sizeInBytes =
+    4 + /* correlationId */
+      2 /* error code */
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+  }
+
+  def describe(details: Boolean) = toString
+}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ec8d9f7..79a82a9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,7 +17,10 @@
 
 package kafka.server
 
+import java.util.Properties
+
 import kafka.api._
+import kafka.api.admin.{ConfigChangeResponse, ConfigChangeRequest}
 import kafka.common._
 import kafka.log._
 import kafka.network._
@@ -62,6 +65,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+        case RequestKeys.ConfigChangeKey => handleConfigChangeRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -442,6 +446,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
+  /*
+   * Handle a brokers config change request
+   */
+  def handleConfigChangeRequest(request: RequestChannel.Request) {
+    val configChangeRequest = request.requestObj.asInstanceOf[ConfigChangeRequest]
+
+    val props = new Properties()
+    for ((k, v) <- configChangeRequest.configChangeData)
+      props.put(k, v)
+
+    AdminUtils.changeBrokersConfig(zkClient, config, props)
+    val response = ConfigChangeResponse(0, configChangeRequest.correlationId)
+
+    trace("Sending config change response %s for correlation id %d to client %s."
+      .format(response, configChangeRequest.correlationId, configChangeRequest.clientId))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
+
   def close() {
     // TODO currently closing the API is an no-op since the API no longer maintain any modules
     // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 05d689a..32bf425 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,7 +18,9 @@
 package kafka.server
 
 import java.util.Properties
+import kafka.utils.Logging
 
+import scala.collection.mutable
 import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.Utils
@@ -459,6 +461,11 @@ object KafkaConfig {
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc)
   }
 
+  /**
+   * A set of properties that are specific from broker and thus cannot be overridden by global config
+   */
+  private val BrokerSpecificProperties = Set(BrokerIdProp, HostNameProp, PortProp, AdvertisedHostNameProp, AdvertisedPortProp)
+
   def configNames() = {
     import scala.collection.JavaConversions._
     configDef.names().toList.sorted
@@ -579,7 +586,7 @@ object KafkaConfig {
   }
 
   /**
-   * Create a log config instance using the given properties and defaults
+   * Create a kafka config instance using the given properties and defaults
    */
   def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
     val props = new Properties(defaults)
@@ -597,16 +604,6 @@ object KafkaConfig {
       require(names.contains(name), "Unknown configuration \"%s\".".format(name))
   }
 
-  /**
-   * Check that the given properties contain only valid kafka config names and that all values can be parsed and are valid
-   */
-  def validate(props: Properties) {
-    validateNames(props)
-    configDef.parse(props)
-
-    // to bootstrap KafkaConfig.validateValues()
-    KafkaConfig.fromProps(props)
-  }
 }
 
 class KafkaConfig(/** ********* Zookeeper Configuration ***********/
@@ -716,7 +713,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
 
                   val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable,
                   val compressionType: String = Defaults.CompressionType
-                   ) {
+                   ) extends Logging {
 
   val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs)
 
@@ -784,6 +781,31 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
   }
 
+  /**
+   * Create a new KafkaConfig instance with global properties overriding local config. Note that properties that are broker
+   * specific (see [[KafkaConfig.BrokerSpecificProperties]]) won't be overridden
+   * @param global some of the KafkaConfig settings that will override local config
+   * @return A new KafkaConfig with global settings taken precedence over current (this) configuration
+   */
+  def applyGlobalProperties(global: Properties): KafkaConfig = {
+    val propsWithOverrides = new Properties()
+    propsWithOverrides.putAll(this.toProps)
+
+    val overridesKeys: mutable.Set[String] = {
+      import JavaConversions._
+      global.stringPropertyNames()
+    }
+
+    for (k <- overridesKeys;
+         key = k.trim
+         if !KafkaConfig.BrokerSpecificProperties.contains(key)) {
+      logger.info("Configuration key=%s will be overridden with Global Configuration value=%s.".format(key, global.get(key)))
+      propsWithOverrides.put(key, global.get(key))
+    }
+
+    KafkaConfig.fromProps(propsWithOverrides)
+  }
+
   def toProps: Properties = {
     val props = new Properties()
     import kafka.server.KafkaConfig._
@@ -895,4 +917,4 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
 
     props
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c14bd45..426fb3b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -46,6 +46,7 @@ object ZkUtils extends Logging {
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val DeleteTopicsPath = "/admin/delete_topics"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
+  val BrokersConfigPath = "/brokers/config"
   val BrokerSequenceIdPath = "/brokers/seqid"
 
   def getTopicPath(topic: String): String = {
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index ee0b21e..1774f27 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -17,6 +17,7 @@
 package kafka.admin
 
 import junit.framework.Assert._
+import org.apache.kafka.common.config.ConfigException
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
@@ -401,4 +402,55 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
   }
 
+  @Test
+  def testBrokersValidConfigChange() {
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
+
+    val messageSize = kafka.server.Defaults.MessageMaxBytes + 1000
+    val controllerSocketTimeout = kafka.server.Defaults.ControllerSocketTimeoutMs
+    val controlledShutdownMaxRetries = kafka.server.Defaults.ControlledShutdownMaxRetries + 5
+
+    try {
+      // change config
+      val configV1 = new Properties()
+      configV1.setProperty(KafkaConfig.MessageMaxBytesProp, messageSize.toString)
+      configV1.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp, controllerSocketTimeout.toString)
+      AdminUtils.changeBrokersConfig(server.zkClient, server.config, configV1)
+      // check config
+      val fetchedConfig = AdminUtils.fetchBrokersConfig(server.zkClient)
+      assertEquals(messageSize.toString, fetchedConfig.getProperty(KafkaConfig.MessageMaxBytesProp))
+      assertEquals(controllerSocketTimeout.toString, fetchedConfig.getProperty(KafkaConfig.ControllerSocketTimeoutMsProp))
+      assertEquals(2, fetchedConfig.size())
+
+      // add config
+      val configV2  = new Properties()
+      configV2.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString)
+      AdminUtils.changeBrokersConfig(server.zkClient, server.config, configV2)
+      // check that config persists all config changes
+      val fetchedConfigV2 = AdminUtils.fetchBrokersConfig(server.zkClient)
+      assertEquals(messageSize.toString, fetchedConfigV2.getProperty(KafkaConfig.MessageMaxBytesProp))
+      assertEquals(controllerSocketTimeout.toString, fetchedConfigV2.getProperty(KafkaConfig.ControllerSocketTimeoutMsProp))
+      assertEquals(controlledShutdownMaxRetries.toString, fetchedConfigV2.getProperty(KafkaConfig.ControlledShutdownMaxRetriesProp))
+      assertEquals(3, fetchedConfigV2.size())
+
+    } finally {
+      server.shutdown()
+    }
+  }
+
+  @Test
+  def testBrokersInvalidConfigChange() {
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
+    try {
+      val config = new Properties()
+      config.setProperty(KafkaConfig.MessageMaxBytesProp, (-100).toString)
+      intercept[ConfigException] {
+        AdminUtils.changeBrokersConfig(server.zkClient, server.config, config)
+      }
+    } finally {
+      server.shutdown()
+    }
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..8bb5351 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import kafka.api.admin.{ConfigChangeResponse, ConfigChangeRequest}
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
@@ -216,6 +217,14 @@ object SerializationTestUtils {
     val body = new JoinGroupResponse(0.asInstanceOf[Short], 1, "consumer1", List(new TopicPartition("test11", 1)))
     JoinGroupResponseAndHeader(1, body)
   }
+
+  def createConfigChangeRequest: ConfigChangeRequest = {
+    ConfigChangeRequest(0, 1, "test", Map("key1" -> "value2", "key2" -> ""))
+  }
+
+  def createConfigChangeResponse: ConfigChangeResponse = {
+    ConfigChangeResponse(ErrorMapping.NoError)
+  }
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -242,6 +251,8 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader
   private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader
   private val joinGroupResponse = SerializationTestUtils.createJoinGroupResponseAndHeader
+  private val configChangeRequest = SerializationTestUtils.createConfigChangeRequest
+  private val configChangeResponse = SerializationTestUtils.createConfigChangeResponse
 
   @Test
   def testSerializationAndDeserialization() {
@@ -254,7 +265,8 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
                                consumerMetadataResponseNoCoordinator, heartbeatRequest,
-                               heartbeatResponse, joinGroupRequest, joinGroupResponse)
+                               heartbeatResponse, joinGroupRequest, joinGroupResponse,
+                               configChangeRequest, configChangeResponse)
 
     requestsAndResponses.foreach { original =>
       val buffer = ByteBuffer.allocate(original.sizeInBytes)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index 7cfba6a..d5e60da 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -391,6 +391,32 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
     })
   }
 
+  @Test
+  def testApplyGlobalConfig(): Unit = {
+    val kafkaConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181")
+    kafkaConfig.toProps
+
+    val globalProperties = new Properties()
+    // Broker specific properties
+    globalProperties.put(KafkaConfig.BrokerIdProp, (kafkaConfig.brokerId + 1).toString)
+    globalProperties.put(KafkaConfig.HostNameProp, kafkaConfig.hostName + "a")
+    globalProperties.put(KafkaConfig.PortProp, (kafkaConfig.port + 1).toString)
+    globalProperties.put(KafkaConfig.AdvertisedHostNameProp, kafkaConfig.advertisedHostName + "a")
+    globalProperties.put(KafkaConfig.AdvertisedPortProp, (kafkaConfig.advertisedPort + 1).toString)
+    // Non-broker specific
+    globalProperties.put(KafkaConfig.AutoCreateTopicsEnableProp, (!kafkaConfig.autoCreateTopicsEnable).toString)
+    val newConfig = kafkaConfig.applyGlobalProperties(globalProperties)
+
+    Assert.assertFalse("applyGlobalProperties shouldn't change original KafkaConfig",
+      newConfig.autoCreateTopicsEnable == kafkaConfig.autoCreateTopicsEnable)
+
+    Assert.assertEquals(kafkaConfig.brokerId, newConfig.brokerId)
+    Assert.assertEquals(kafkaConfig.hostName, newConfig.hostName)
+    Assert.assertEquals(kafkaConfig.port, newConfig.port)
+    Assert.assertEquals(kafkaConfig.advertisedHostName, newConfig.advertisedHostName)
+    Assert.assertEquals(kafkaConfig.advertisedPort, newConfig.advertisedPort)
+  }
+
   private def randFrom[T](choices: T*): T = {
     import scala.util.Random
     choices(Random.nextInt(choices.size))
-- 
1.9.1

