Index: src/test/java/org/apache/hadoop/hbase/TestHBaseDynamicConfiguration.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/test/java/org/apache/hadoop/hbase/TestHBaseDynamicConfiguration.java (revision )
+++ src/test/java/org/apache/hadoop/hbase/TestHBaseDynamicConfiguration.java (revision )
@@ -0,0 +1,266 @@
+package org.apache.hadoop.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.zookeeper.ClusterConfigTracker;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+//@Category(MediumTests.class)
+public class TestHBaseDynamicConfiguration {
+
+ final Log LOG = LogFactory.getLog(getClass());
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected static MiniHBaseCluster miniHBaseCluster = null;
+ protected Configuration conf;
+
+ @Before
+ public void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.dynamic.configuration.enabled", true);
+ miniHBaseCluster = TEST_UTIL.startMiniCluster(1,1);
+ }
+
+ @After
+ public void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testConfigurationChangesWhileMasterCrash() throws IOException, InterruptedException {
+ LOG.info("Start testConfigurationChangesWhileMasterCrash");
+
+ Configuration clusterConfig = TEST_UTIL.getConfiguration();
+ int balancerPeriod = clusterConfig.getInt("hbase.balancer.period", 10);
+ assertEquals(balancerPeriod, 300000);
+ int handlerCount = clusterConfig.getInt("hbase.master.handler.count", 30);
+ assertEquals(handlerCount, 30);
+ long rsFatalsBuffer = clusterConfig.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024);
+ assertEquals(rsFatalsBuffer, 1*1024*1024);
+ long catalogTimeout = clusterConfig.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE);
+ assertEquals(catalogTimeout, Integer.MAX_VALUE);
+ long catalogVerifyTimeout = clusterConfig.getInt("hbase.catalog.verification.timeout", 1000);
+ assertEquals(catalogVerifyTimeout, 1000);
+
+ ClusterConfigTracker cct = miniHBaseCluster.getMaster().getClusterConfigTracker();
+ updateClusterConfig(cct);
+
+ Thread.sleep(5000);
+ HBaseInMemoryConfiguration masterConfig = (HBaseInMemoryConfiguration)
+ miniHBaseCluster.getMaster().getConfiguration();
+ assertConfigChanges(masterConfig);
+
+ TEST_UTIL.getHBaseCluster().startMaster();
+ Thread.sleep(10000);
+
+ HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+ activeMaster.abort("Aborting primary master to test dynamic configuration overrides. ",
+ new Exception("Die now"));
+ Thread.sleep(15000);
+
+ HMaster secondMaster = TEST_UTIL.getHBaseCluster().getMaster();
+ HBaseInMemoryConfiguration secondMasterConfig =
+ (HBaseInMemoryConfiguration) secondMaster.getConfiguration();
+ assertConfigChanges(secondMasterConfig);
+
+ HRegionServer hRegionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+ HBaseInMemoryConfiguration regionConfig =
+ (HBaseInMemoryConfiguration) hRegionServer.getConfiguration();
+ assertConfigChanges(regionConfig);
+
+ LOG.info("End testConfigurationChangesWhileMasterCrash");
+ }
+
+ private void updateClusterConfig(ClusterConfigTracker cct) {
+ cct.updateClusterConfig("hbase.balancer.period", "200000");
+ cct.updateClusterConfig("hbase.master.handler.count", "30");
+ cct.updateClusterConfig("hbase.master.buffer.for.rs.fatals", Long.toString(2*1024*1024));
+ cct.updateClusterConfig("hbase.master.catalog.timeout", Integer.toString(Integer.MAX_VALUE-100));
+ cct.updateClusterConfig("hbase.catalog.verification.timeout", Long.toString(2000));
+ }
+
+ private void assertConfigChanges(Configuration configuration) {
+ int balancePeriod = configuration.getInt("hbase.balancer.period", 40000);
+ LOG.info("Balancer period from second master config = " + balancePeriod);
+ assertEquals(balancePeriod, 200000);
+ int handlerCount = configuration.getInt("hbase.master.handler.count", 20);
+ assertEquals(handlerCount, 30);
+ long rsFatalsBuffer = configuration.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024);
+ assertEquals(rsFatalsBuffer, 2*1024*1024);
+ int catalogTimeout = configuration.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE);
+ assertEquals(catalogTimeout, Integer.MAX_VALUE-100);
+ int catalogVerifyTimeout = configuration.getInt("hbase.catalog.verification.timeout", 1000);
+ assertEquals(catalogVerifyTimeout, 2000);
+ }
+
+ @Test
+ public void testDumpConfiguration() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ StringWriter outWriter = new StringWriter();
+ conf.dumpConfiguration(conf, outWriter);
+ String jsonStr = outWriter.toString();
+ ObjectMapper mapper = new ObjectMapper();
+ JsonConfiguration jconf = mapper.readValue(jsonStr, JsonConfiguration.class);
+ JsonProperty[] props = jconf.getProperties();
+ if (props != null) {
+ System.err.println("Props length = " + props.length);
+ }
+ System.err.println("Done");
+ assertEquals("1","1");
+ }
+
+ @Test
+ public void testConfigMemoryOverride() throws Exception {
+ LOG.info("Start testConfigMemoryOverride");
+
+ Configuration clusterConfig = TEST_UTIL.getConfiguration();
+ int balancerPeriod = clusterConfig.getInt("hbase.balancer.period", 10);
+ assertEquals(balancerPeriod, 300000);
+ ClusterConfigTracker cct = miniHBaseCluster.getMaster().getClusterConfigTracker();
+ cct.updateClusterConfig("hbase.balancer.period", "200000");
+ Thread.sleep(1000);
+ HBaseInMemoryConfiguration masterConfig = (HBaseInMemoryConfiguration)
+ miniHBaseCluster.getMaster().getConfiguration();
+ int newBalancerPeriod = masterConfig.getInt("hbase.balancer.period", 40000);
+ assertEquals(newBalancerPeriod, 200000);
+ LOG.info("End testConfigMemoryOverride");
+
+ }
+
+
+
+ //@Test
+ public void testGetClusterConfigMap() throws Exception {
+ LOG.info("Start testGetClusterConfigMap");
+ ClusterConfigTracker cct = miniHBaseCluster.getMaster().getClusterConfigTracker();
+ Map configMap = cct.getClusterConfigMap();
+ assertNotNull(configMap);
+ assertTrue(configMap.size() > 0);
+ LOG.info("End testGetClusterConfigMap");
+
+ }
+
+ @Test
+ public void testConfigAddition() throws Exception {
+ LOG.info("Start testConfigAddition");
+
+ ClusterConfigTracker cct = miniHBaseCluster.getMaster().getClusterConfigTracker();
+ int sizeBefore = cct.getClusterConfigMap().size();
+ cct.createClusterConfig("hbase.config.test.TestConfigKey", "TestConfigValue");
+ Thread.sleep(1000);
+ int sizeAfter = cct.getClusterConfigMap().size();
+ assertEquals(sizeBefore + 1, sizeAfter);
+ String configValue =
+ (String) cct.getClusterConfigMap().get("hbase.config.test.TestConfigKey");
+ assertEquals(configValue, "TestConfigValue");
+ LOG.info("End testConfigAddition");
+
+ }
+
+ @Test
+ public void testConfigDuplicateAddition() throws Exception {
+ LOG.info("Start testConfigDuplicateAddition");
+
+ ClusterConfigTracker cct = miniHBaseCluster.getMaster().getClusterConfigTracker();
+ int sizeBefore = cct.getClusterConfigMap().size();
+ cct.createClusterConfig("hbase.config.test.TestConfigKey", "TestConfigValue");
+ Thread.sleep(1000);
+ int sizeAfter = cct.getClusterConfigMap().size();
+ assertEquals(sizeAfter, sizeBefore + 1);
+ String configValue =
+ (String) cct.getClusterConfigMap().get("hbase.config.test.TestConfigKey");
+ assertEquals(configValue, "TestConfigValue");
+ // Now try to create the same config again. This should simply update the config
+ // value
+ cct.createClusterConfig("hbase.config.test.TestConfigKey", "TestConfigValue201");
+ Thread.sleep(1000);
+ sizeAfter = cct.getClusterConfigMap().size();
+ assertEquals(sizeAfter, sizeBefore + 1);
+ configValue =
+ (String) cct.getClusterConfigMap().get("hbase.config.test.TestConfigKey");
+ assertEquals(configValue, "TestConfigValue201");
+ LOG.info("End testConfigDuplicateAddition");
+
+ }
+
+ @Test
+ public void testConfigUpdate() throws Exception {
+ LOG.info("Start testConfigUpdate");
+
+ ClusterConfigTracker cct = miniHBaseCluster.getMaster().getClusterConfigTracker();
+ int sizeBefore = cct.getClusterConfigMap().size();
+ cct.createClusterConfig("hbase.config.test.TestConfigKey", "TestConfigValue");
+ Thread.sleep(1000);
+ int sizeAfter = cct.getClusterConfigMap().size();
+ assertEquals(sizeAfter, sizeBefore + 1);
+ String configValue =
+ (String) cct.getClusterConfigMap().get("hbase.config.test.TestConfigKey");
+ assertEquals(configValue, "TestConfigValue");
+ sizeBefore = cct.getClusterConfigMap().size();
+ cct.updateClusterConfig("hbase.config.test.TestConfigKey", "TestConfigValue101");
+ Thread.sleep(1000);
+ sizeAfter = cct.getClusterConfigMap().size();
+ // No change in config size map since we are only updating existing value
+ assertEquals(sizeAfter, sizeBefore);
+ configValue =
+ (String) cct.getClusterConfigMap().get("hbase.config.test.TestConfigKey");
+ assertEquals(configValue, "TestConfigValue101");
+ LOG.info("End testConfigUpdate");
+
+ }
+
+ static class JsonConfiguration {
+ JsonProperty[] properties;
+
+ public JsonProperty[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonProperty[] properties) {
+ this.properties = properties;
+ }
+ }
+
+ static class JsonProperty {
+ String key;
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public String getValue() {
+ return value;
+ }
+ public void setValue(String value) {
+ this.value = value;
+ }
+ public boolean getIsFinal() {
+ return isFinal;
+ }
+ public void setIsFinal(boolean isFinal) {
+ this.isFinal = isFinal;
+ }
+ public String getResource() {
+ return resource;
+ }
+ public void setResource(String resource) {
+ this.resource = resource;
+ }
+ String value;
+ boolean isFinal;
+ String resource;
+ }
+
+}
Index: src/main/ruby/shell.rb
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/ruby/shell.rb (revision 1298238)
+++ src/main/ruby/shell.rb (revision )
@@ -270,6 +270,8 @@
unassign
zk_dump
hlog_roll
+ update_config
+ get_config
]
)
Index: src/main/resources/hbase-default.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/resources/hbase-default.xml (revision 1309612)
+++ src/main/resources/hbase-default.xml (revision )
@@ -893,5 +893,14 @@
In both cases, the aggregated metric M across tables and cfs will be reported.
+
After construction, use {@link #start} to kick off tracking.
+ *
+ * @param watcher
+ * @param abortable
+ */
+ public ClusterConfigTracker(ZooKeeperWatcher watcher,
+ Abortable abortable, Configuration configuration) {
+ super(watcher, watcher.clusterConfigZNode, abortable);
+ this.configuration = configuration;
+ this.clusterConfiguration = new HBaseInMemoryConfiguration(configuration);
+ }
+
+ @Override
+ public void start() {
+ try {
+ LOG.info("Starting cluster configuration tracker");
+ watcher.registerListener(this);
+ initClusterConfig();
+ } catch (KeeperException e) {
+ LOG.error("ClusterConfigTracker startup failed.", e);
+ abortable.abort("ClusterConfigTracker startup failed", e);
+ } catch (IOException e) {
+ LOG.error("ClusterConfigTracker startup failed.", e);
+ abortable.abort("ClusterConfigTracker startup failed", e);
+ }
+ }
+
+ private boolean hasClusterConfigAvailableInZK() throws KeeperException {
+ int checkExists = ZKUtil.checkExists(watcher, watcher.clusterConfigZNode);
+ if (checkExists != -1) {
+ int configNodes = ZKUtil.getNumberOfChildren(watcher, watcher.clusterConfigZNode);
+ byte[] data = ZKUtil.getData(watcher, watcher.clusterConfigZNode);
+ if (data != null && data.length > 0 && configNodes > 0) {
+ // Cluster configuration already exists in ZK.
+ LOG.info("ZK Cluster Config already available. Skipping config ZK creation");
+ return true;
+ }
+ }
+ LOG.info("ZK Cluster Config not available. ");
+ return false;
+ }
+
+ private void initClusterConfig() throws KeeperException, IOException {
+
+ if (!hasClusterConfigAvailableInZK()) {
+ createClusterConfig();
+ ZKUtil.getChildDataAndWatchForNewChildren(watcher, watcher.clusterConfigZNode);
+ } else {
+ clusterConfiguration.refreshClusterConfigMap(
+ ZKUtil.getChildDataAndWatchForNewChildren(watcher, watcher.clusterConfigZNode));
+ }
+ }
+
+ /**
+ * Get the current cluster configuration map.
+ * @return
+ */
+ public Map getClusterConfigMap() {
+ return this.clusterConfiguration.getClusterConfigMap();
+ }
+
+ public HBaseInMemoryConfiguration getClusterConfiguration() {
+ return clusterConfiguration;
+ }
+
+ public void setClusterConfiguration(HBaseInMemoryConfiguration clusterConfiguration) {
+ this.clusterConfiguration = clusterConfiguration;
+ }
+
+ /**
+ * Update an existing HBase cluster configuration key. Creates a new configuration node,
+ * if the config does not exist.
+ * @param configKey
+ * @param configValue
+ */
+ public boolean updateClusterConfig(String configKey, String configValue) {
+ try {
+ if (ZKUtil.checkExists(watcher,
+ getConfigNodePath(configKey)) > 0) {
+ LOG.info("Updating cluster configuration. Config Key = " + configKey
+ + " Value = " + configValue);
+ ZKUtil.setData(watcher, getConfigNodePath(configKey), Bytes.toBytes(configValue));
+ } else {
+ LOG.info("Cluster configuration does not exist for Config Key = "
+ + configKey + " Creating new cluster config node");
+ ZKUtil.createSetData(watcher, getConfigNodePath(configKey),
+ Bytes.toBytes(configValue));
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failure during update cluster configuration", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Create a new HBase Cluster configuration key. Updates the configuration if the configuration
+ * node already exists.
+ * @param configKey
+ * @param configValue
+ */
+ public void createClusterConfig(String configKey, String configValue) {
+ try {
+ LOG.info("Creating cluster configuration. Config Key = " + configKey
+ + " Value = " + configValue);
+ if (ZKUtil.checkExists(watcher,
+ getConfigNodePath(configKey)) > 0) {
+ LOG.info("Cluster configuration node exist for Config Key = "
+ + configKey + " Updating the cluster config node");
+ ZKUtil.setData(watcher, getConfigNodePath(configKey), Bytes.toBytes(configValue));
+ } else {
+ ZKUtil.createSetData(watcher, getConfigNodePath(configKey),
+ Bytes.toBytes(configValue));
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failure during cluster config creation in ZK. ", e);
+ }
+ }
+
+ private String getConfigNodePath(String configKey) {
+ return ZKUtil.joinZNode(watcher.clusterConfigZNode, configKey);
+ }
+
+ private void createClusterConfig() throws IOException, KeeperException {
+ LOG.info("Creating Cluster Configuration ZK nodes");
+ JsonConfiguration jsonConfiguration = readClusterConfiguration(configuration);
+ JsonProperty[] jsonProperties = jsonConfiguration.getProperties();
+ if (jsonProperties.length > 0) {
+ LOG.info("Number of cluster configuration properties = " + jsonProperties.length);
+ for (JsonProperty jsonProperty : jsonProperties) {
+ ZKUtil.createSetData(watcher, getZKConfigNodeName(jsonProperty.getKey()),
+ Bytes.toBytes(jsonProperty.getValue()));
+ clusterConfiguration.put(jsonProperty.getKey(), jsonProperty.getValue());
+ }
+ ZKUtil.setData(watcher, watcher.clusterConfigZNode, Bytes.toBytes(jsonProperties.length));
+ } else {
+ LOG.error("Cannot load cluster configuration properties to ZK.");
+ }
+ }
+
+
+
+ private String getZKConfigNodeName(String key) {
+ return ZKUtil.joinZNode(watcher.clusterConfigZNode, key);
+ }
+
+ private JsonConfiguration readClusterConfiguration(Configuration configuration) throws IOException {
+ StringWriter outWriter = new StringWriter();
+ configuration.dumpConfiguration(configuration, outWriter);
+ String jsonStr = outWriter.toString();
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(jsonStr, JsonConfiguration.class);
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ LOG.info("ClusterConfigTracker.nodeCreated. Path = " + path);
+ processConfigChange();
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ LOG.info("ClusterConfigTracker.nodeDeleted. Path = " + path);
+ processConfigChange();
+ }
+ }
+
+ private void processConfigChange() {
+ try {
+ clusterConfiguration.refreshClusterConfigMap(
+ ZKUtil.getChildDataAndWatchForNewChildren(watcher, watcher.clusterConfigZNode));
+ } catch (KeeperException ke) {
+ LOG.error("Cluster config refresh error ", ke);
+ }
+ }
+
+ @Override
+ public void nodeDataChanged(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ processConfigChange();
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.startsWith(watcher.clusterConfigZNode)) {
+ processConfigChange();
+ }
+ }
+
+ static class JsonConfiguration {
+ JsonProperty[] properties;
+
+ public JsonProperty[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonProperty[] properties) {
+ this.properties = properties;
+ }
+ }
+
+ static class JsonProperty {
+ String key;
+ String value;
+ boolean isFinal;
+ String resource;
+
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public String getValue() {
+ return value;
+ }
+ public void setValue(String value) {
+ this.value = value;
+ }
+ public boolean getIsFinal() {
+ return isFinal;
+ }
+ public void setIsFinal(boolean isFinal) {
+ this.isFinal = isFinal;
+ }
+ public String getResource() {
+ return resource;
+ }
+ public void setResource(String resource) {
+ this.resource = resource;
+ }
+ }
+
+}
+
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1309612)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision )
@@ -19,39 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
-import java.io.StringWriter;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryUsage;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.management.ObjectName;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,44 +28,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.ClockOutOfSyncException;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HServerLoad;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterAddressTracker;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.UnknownRowLockException;
-import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.Action;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.MultiAction;
-import org.apache.hadoop.hbase.client.MultiResponse;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -110,24 +47,10 @@
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
-import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
-import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.Invocation;
-import org.apache.hadoop.hbase.ipc.ProtocolSignature;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.ipc.*;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
-import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
-import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
-import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
+import org.apache.hadoop.hbase.regionserver.handler.*;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerDynamicMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
@@ -136,22 +59,8 @@
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CompressionTest;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.InfoServer;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Sleeper;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.VersionInfo;
-import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
-import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
-import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.zookeeper.*;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
@@ -162,8 +71,25 @@
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -297,6 +223,7 @@
// Schema change Tracker
private SchemaChangeTracker schemaChangeTracker;
+ private ClusterConfigTracker clusterConfigTracker;
// Log Splitting Worker
private SplitLogWorker splitLogWorker;
@@ -363,8 +290,12 @@
*/
public HRegionServer(Configuration conf)
throws IOException, InterruptedException {
+ this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
+ "isa.port", this);
+
this.fsOk = true;
- this.conf = conf;
+ //this.conf = conf;
+ this.conf = getClusterConfiguration(conf);
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, LOG);
this.isOnline = false;
@@ -430,6 +361,18 @@
regionServerAccounting = new RegionServerAccounting();
cacheConfig = new CacheConfig(conf);
}
+
+ private Configuration getClusterConfiguration(Configuration configuration) {
+
+ this.clusterConfigTracker = new ClusterConfigTracker(getZooKeeper(), this, configuration);
+ this.clusterConfigTracker.start();
+ boolean dynamicConfigEnabled = configuration.getBoolean(
+ "hbase.dynamic.configuration.enabled", false);
+ if (!dynamicConfigEnabled) return configuration;
+ // Initialize cluster config tracker
+ return clusterConfigTracker.getClusterConfiguration();
+ }
+
/**
* Run test on configured codecs to make sure supporting libs are in place.
Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1309612)
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision )
@@ -111,6 +111,7 @@
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hbase.zookeeper.ClusterConfigTracker;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics.util.MBeanUtil;
@@ -187,7 +188,8 @@
// Schema change tracker
private MasterSchemaChangeTracker schemaChangeTracker;
-
+ // Cluster config tracker
+ private ClusterConfigTracker clusterConfigTracker = null;
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
// operations/debugging.
@@ -250,7 +252,10 @@
*/
public HMaster(final Configuration conf)
throws IOException, KeeperException, InterruptedException {
- this.conf = new Configuration(conf);
+
+
+ this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + "isa.port", this, true);
+ this.conf = getClusterConfiguration(conf);
// Disable the block cache on the master
this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
// Set how many times to retry talking to another server over HConnection.
@@ -303,6 +308,23 @@
this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf);
}
+ private Configuration getClusterConfiguration(Configuration configuration) {
+
+ this.clusterConfigTracker = new ClusterConfigTracker(getZooKeeper(), this, configuration);
+ this.clusterConfigTracker.start();
+ boolean dynamicConfigEnabled = configuration.getBoolean(
+ "hbase.dynamic.configuration.enabled", false);
+ if (!dynamicConfigEnabled) {
+ LOG.info("HBase runtime dynamic cluster configuration is disabled. Configuration" +
+ "changes require a config file change and cluster restart.");
+ return configuration;
+ }
+ LOG.info("HBase runtime dynamic cluster configuration is enabled. Configuration changes" +
+ " can be done through the shell and does not require cluster restart");
+ // Initialize cluster config tracker
+ return clusterConfigTracker.getClusterConfiguration();
+ }
+
/**
* Get whether instant schema change is on or not.
* @param c
@@ -457,6 +479,12 @@
conf.getInt("hbase.instant.schema.alter.timeout", 60000));
this.schemaChangeTracker.start();
+ // Initialize cluster config tracker
+ if (this.clusterConfigTracker == null) {
+ this.clusterConfigTracker = new ClusterConfigTracker(getZooKeeper(), this, conf);
+ this.clusterConfigTracker.start();
+ }
+
LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
@@ -820,6 +848,10 @@
return this.regionServerTracker;
}
+ public ClusterConfigTracker getClusterConfigTracker() {
+ return this.clusterConfigTracker;
+ }
+
/** @return InfoServer object. Maybe null.*/
public InfoServer getInfoServer() {
return this.infoServer;
@@ -1068,6 +1100,16 @@
}
@Override
+ public boolean updateConfig(String configKey, String configValue) {
+ return this.clusterConfigTracker.updateClusterConfig(configKey, configValue);
+ }
+
+ @Override
+ public String getConfig(String configKey) {
+ return this.conf.get(configKey);
+ }
+
+ @Override
public boolean balance() {
// If balance not true, don't run balancer.
if (!this.balanceSwitch) return false;
@@ -1631,8 +1673,10 @@
private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
IOException, KeeperException, ExecutionException {
- this.zooKeeper.reconnectAfterExpiration();
-
+ this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":"
+ + this.serverName.getPort(), this, true);
+ // Set cluster config tracker to null to trigger a reload.
+ this.clusterConfigTracker = null;
Callable