Index: conf/hbase-site.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- conf/hbase-site.xml (revision 1403148)
+++ conf/hbase-site.xml (revision )
@@ -21,4 +21,25 @@
*/
-->
After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public ClusterConfigTracker(ZooKeeperWatcher watcher,
+ Abortable abortable, Configuration configuration) {
+ super(watcher, watcher.clusterConfigZNode, abortable);
+ this.configuration = configuration;
+ this.clusterConfiguration = new HBaseInMemoryConfiguration(configuration);
+ }
+
+ @Override
+ public void start() {
+ try {
+ LOG.info("Starting cluster configuration tracker");
+ watcher.registerListener(this);
+ initClusterConfig();
+ } catch (KeeperException e) {
+ LOG.error("ClusterConfigTracker startup failed.", e);
+ abortable.abort("ClusterConfigTracker startup failed", e);
+ } catch (IOException e) {
+ LOG.error("ClusterConfigTracker startup failed.", e);
+ abortable.abort("ClusterConfigTracker startup failed", e);
+ }
+ }
+
+ private boolean isClusterConfigAvailableInZK() throws KeeperException {
+ int checkExists = ZKUtil.checkExists(watcher, watcher.clusterConfigZNode);
+ if (checkExists != -1) {
+ int configNodes = ZKUtil.getNumberOfChildren(watcher, watcher.clusterConfigZNode);
+ byte[] data = ZKUtil.getData(watcher, watcher.clusterConfigZNode);
+ if (data != null && data.length > 0 && configNodes > 0) {
+ // Cluster configuration already exists in ZK.
+ LOG.info("ZK Cluster Config already available. Skipping config ZK node creation");
+ return true;
+ }
+ }
+ LOG.info("ZK Cluster Config node not available. ");
+ return false;
+ }
+
+ private void initClusterConfig() throws KeeperException, IOException {
+
+ if (!isClusterConfigAvailableInZK()) {
+ createClusterConfig();
+ ZKUtil.getChildDataAndWatchForNewChildren(watcher, watcher.clusterConfigZNode);
+ } else {
+ clusterConfiguration.refreshClusterConfigMap(
+ ZKUtil.getChildDataAndWatchForNewChildren(watcher, watcher.clusterConfigZNode));
+ }
+ }
+
+ /**
+ * Get the current cluster configuration map.
+ *
+ * @return Map of current cluster config K,V pairs
+ */
+ public Map getClusterConfigMap() {
+ return this.clusterConfiguration.getClusterConfigMap();
+ }
+
+ /**
+ * Get current in memory cluster configuration
+ *
+ * @return HBaseInMemoryConfiguration
+ */
+ public HBaseInMemoryConfiguration getClusterConfiguration() {
+ return clusterConfiguration;
+ }
+
+ public void setClusterConfiguration(HBaseInMemoryConfiguration clusterConfiguration) {
+ this.clusterConfiguration = clusterConfiguration;
+ }
+
+ /**
+ * Update an existing HBase cluster configuration key. Creates a new configuration node,
+ * if the config does not exist.
+ *
+ * @param configKey
+ * @param configValue
+ */
+ public boolean updateClusterConfig(String configKey, String configValue) {
+ try {
+ if (ZKUtil.checkExists(watcher,
+ getConfigNodePath(configKey)) > 0) {
+ LOG.debug("Updating cluster configuration. Config Key = " + configKey
+ + " Value = " + configValue);
+ ZKUtil.setData(watcher, getConfigNodePath(configKey), Bytes.toBytes(configValue));
+ } else {
+ LOG.debug("Cluster configuration does not exist for Config Key = "
+ + configKey + " Creating new cluster config node");
+ ZKUtil.createSetData(watcher, getConfigNodePath(configKey),
+ Bytes.toBytes(configValue));
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failure during update cluster configuration", e);
+ return false;
+ }
+ return true;
+ }
+
+ private String getConfigNodePath(String configKey) {
+ return ZKUtil.joinZNode(watcher.clusterConfigZNode, configKey);
+ }
+
+ private void createClusterConfig() throws IOException, KeeperException {
+ LOG.info("Creating Cluster Configuration ZK nodes");
+ JsonConfiguration jsonConfiguration = readClusterConfiguration(configuration);
+ JsonProperty[] jsonProperties = jsonConfiguration.getProperties();
+ if (jsonProperties.length > 0) {
+ LOG.info("Number of cluster configuration properties = " + jsonProperties.length);
+ for (JsonProperty jsonProperty : jsonProperties) {
+ ZKUtil.createSetData(watcher, getZKConfigNodeName(jsonProperty.getKey()),
+ Bytes.toBytes(jsonProperty.getValue()));
+ clusterConfiguration.put(jsonProperty.getKey(), jsonProperty.getValue());
+ }
+ ZKUtil.setData(watcher, watcher.clusterConfigZNode, Bytes.toBytes(jsonProperties.length));
+ } else {
+ // This should never happen
+ LOG.error("Current cluster config property set is empty.");
+ }
+ }
+
+
+ private String getZKConfigNodeName(String key) {
+ return ZKUtil.joinZNode(watcher.clusterConfigZNode, key);
+ }
+
+ private JsonConfiguration readClusterConfiguration(Configuration configuration) throws IOException {
+ StringWriter outWriter = new StringWriter();
+ Configuration.dumpConfiguration(configuration, outWriter);
+ String jsonStr = outWriter.toString();
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(jsonStr, JsonConfiguration.class);
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ processConfigChange();
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ processConfigChange();
+ }
+ }
+
+ private void processConfigChange() {
+ try {
+ clusterConfiguration.refreshClusterConfigMap(
+ ZKUtil.getChildDataAndWatchForNewChildren(watcher, watcher.clusterConfigZNode));
+ } catch (KeeperException ke) {
+ LOG.error("Cluster config refresh error ", ke);
+ }
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ processConfigChange();
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ processConfigChange();
+ }
+ }
+
+ static class JsonConfiguration {
+ JsonProperty[] properties;
+
+ public JsonProperty[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonProperty[] properties) {
+ this.properties = properties;
+ }
+ }
+
+ static class JsonProperty {
+ String key;
+ String value;
+ boolean isFinal;
+ String resource;
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public boolean getIsFinal() {
+ return isFinal;
+ }
+
+ public void setIsFinal(boolean isFinal) {
+ this.isFinal = isFinal;
+ }
+
+ public String getResource() {
+ return resource;
+ }
+
+ public void setResource(String resource) {
+ this.resource = resource;
+ }
+ }
+
+}
+
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision 1403148)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (revision )
@@ -739,7 +739,6 @@
lastCountChange = now;
}
}
-
LOG.info("Finished waiting for region servers count to settle;" +
" checked in " + count + ", slept for " + slept + " ms," +
" expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java (revision 1403148)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java (revision )
@@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
@@ -61,6 +62,10 @@
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.GetConfigResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UpdateConfigRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UpdateConfigResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.GetConfigRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
@@ -308,7 +313,27 @@
*/
@Override
public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
+ throws ServiceException;
+
+ /**
+ * Update cluster configuration.
+ * @param c Controller Unused (set to null).
+ * @param req UpdateConfigRequest
+ * @return
+ * @throws ServiceException
+ */
+ public UpdateConfigResponse updateConfig(RpcController c, UpdateConfigRequest req)
+ throws ServiceException;
+
+ /**
+ * Get current cluster configuration for a given config key.
+ * @param c Controller unused (set to null)
+ * @param req GetConfigRequest
+ * @return
+ * @throws ServiceException
+ */
+ public GetConfigResponse getConfig(RpcController c, GetConfigRequest req)
- throws ServiceException;
+ throws ServiceException;
/**
* Run a scan of the catalog table
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1403148)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision )
@@ -103,6 +103,8 @@
public String splitLogZNode;
// znode containing the state of the load balancer
public String balancerZNode;
+ // znode to monitor online cluster configuration changes
+ public String clusterConfigZNode;
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList