diff --git src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
index 9c9c7cc..7defac0 100644
--- src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
+++ src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
@@ -309,10 +309,12 @@ public class LocalHBaseCluster {
*/
public HMaster getActiveMaster() {
for (JVMClusterUtil.MasterThread mt : masterThreads) {
- // Ensure that the current active master is not stopped.
- // We don't want to return a stopping master as an active master.
- if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
- return mt.getMaster();
+ if (mt.getMaster().isActiveMaster()) {
+ // Ensure that the current active master is not stopped.
+ // We don't want to return a stopping master as an active master.
+ if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
+ return mt.getMaster();
+ }
}
}
return null;
diff --git src/main/java/org/apache/hadoop/hbase/MasterAddressTracker.java src/main/java/org/apache/hadoop/hbase/MasterAddressTracker.java
deleted file mode 100644
index a246abe..0000000
--- src/main/java/org/apache/hadoop/hbase/MasterAddressTracker.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * Manages the location of the current active Master for this RegionServer.
- *
- * Listens for ZooKeeper events related to the master address. The node
- * /master will contain the address of the current master.
- * This listener is interested in
- * NodeDeleted and NodeCreated events on
- * /master.
- *
- * Utilizes {@link ZooKeeperNodeTracker} for zk interactions.
- *
- * You can get the current master via {@link #getMasterAddress()}
- */
-@InterfaceAudience.Private
-public class MasterAddressTracker extends ZooKeeperNodeTracker {
- /**
- * Construct a master address listener with the specified
- * zookeeper reference.
- *
- * This constructor does not trigger any actions, you must call methods
- * explicitly. Normally you will just want to execute {@link #start()} to
- * begin tracking of the master address.
- *
- * @param watcher zk reference and watcher
- * @param abortable abortable in case of fatal error
- */
- public MasterAddressTracker(ZooKeeperWatcher watcher, Abortable abortable) {
- super(watcher, watcher.masterAddressZNode, abortable);
- }
-
- /**
- * Get the address of the current master if one is available. Returns null
- * if no current master.
- * @return Server name or null if timed out.
- */
- public ServerName getMasterAddress() {
- return bytesToServerName(super.getData(false));
- }
-
- /**
- * Check if there is a master available.
- * @return true if there is a master set, false if not.
- */
- public boolean hasMaster() {
- return super.getData(false) != null;
- }
-
- /**
- * @param bytes Byte array of {@link ServerName#toString()}
- * @return A {@link ServerName} instance.
- */
- private ServerName bytesToServerName(final byte [] bytes) {
- return bytes == null ? null: ServerName.parseVersionedServerName(bytes);
- }
-}
diff --git src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index aa30969..7235e34 100644
--- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -654,14 +655,7 @@ public class HConnectionManager {
try {
checkIfBaseNodeAvailable(zkw);
-
- byte[] masterAddress = ZKUtil.getData(zkw, zkw.masterAddressZNode);
- if (masterAddress == null){
- throw new IOException("Can't get master address from ZooKeeper");
- }
-
- ServerName sn = ServerName.parseVersionedServerName(masterAddress);
-
+ ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
if (sn == null) {
String msg =
"ZooKeeper available but no active master location found";
diff --git src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
index 6229858..4121508 100644
--- src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
+++ src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
@@ -144,12 +144,13 @@ public abstract class EventHandler implements Runnable, Comparable {
* Constructor
*/
EventType(int value) {}
- public boolean isSchemaChangeEvent() {
+ public boolean isOnlineSchemaChangeSupported() {
return (
- this.equals(EventType.C_M_ADD_FAMILY) ||
- this.equals(EventType.C_M_DELETE_FAMILY) ||
- this.equals(EventType.C_M_MODIFY_FAMILY) ||
- this.equals(EventType.C_M_MODIFY_TABLE));
+ this.equals(EventType.C_M_ADD_FAMILY) ||
+ this.equals(EventType.C_M_DELETE_FAMILY) ||
+ this.equals(EventType.C_M_MODIFY_FAMILY) ||
+ this.equals(EventType.C_M_MODIFY_TABLE)
+ );
}
}
diff --git src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
index ea7ae45..ce81547 100644
--- src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
+++ src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
@@ -266,12 +266,4 @@ public interface HMasterInterface extends VersionedProtocol {
* @return array of HTableDescriptor
*/
public HTableDescriptor[] getHTableDescriptors(List tableNames);
-
- /**
- * Returns the current running status of load balancer.
- * @return True if LoadBalancer is running now else False.
- */
- public boolean isLoadBalancerRunning();
-
-
}
diff --git src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
index ef19cb9..16b8e4a 100644
--- src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
@@ -19,21 +19,21 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
/**
* Handles everything on master-side related to master election.
@@ -70,14 +70,16 @@ class ActiveMasterManager extends ZooKeeperListener {
@Override
public void nodeCreated(String path) {
- if(path.equals(watcher.masterAddressZNode) && !master.isStopped()) {
- handleMasterNodeChange();
- }
+ handle(path);
}
@Override
public void nodeDeleted(String path) {
- if(path.equals(watcher.masterAddressZNode) && !master.isStopped()) {
+ handle(path);
+ }
+
+ void handle(final String path) {
+ if (path.equals(watcher.getMasterAddressZNode()) && !master.isStopped()) {
handleMasterNodeChange();
}
}
@@ -99,7 +101,7 @@ class ActiveMasterManager extends ZooKeeperListener {
// Watch the node and check if it exists.
try {
synchronized(clusterHasActiveMaster) {
- if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
+ if (ZKUtil.watchAndCheckExists(watcher, watcher.getMasterAddressZNode())) {
// A master node exists, there is an active master
LOG.debug("A master is now available");
clusterHasActiveMaster.set(true);
@@ -136,14 +138,11 @@ class ActiveMasterManager extends ZooKeeperListener {
// Try to become the active master, watch if there is another master.
// Write out our ServerName as versioned bytes.
try {
- String backupZNode = ZKUtil.joinZNode(
- this.watcher.backupMasterAddressesZNode, this.sn.toString());
- if (ZKUtil.createEphemeralNodeAndWatch(this.watcher,
- this.watcher.masterAddressZNode, this.sn.getVersionedBytes())) {
+ String backupZNode = ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString());
+ if (MasterAddressTracker.setMasterAddress(this.watcher, this.watcher.getMasterAddressZNode(), this.sn)) {
// If we were a backup master before, delete our ZNode from the backup
// master directory since we are the active now
- LOG.info("Deleting ZNode for " + backupZNode +
- " from backup master directory");
+ LOG.info("Deleting ZNode for " + backupZNode + " from backup master directory");
ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
// We are the master, return
@@ -165,14 +164,12 @@ class ActiveMasterManager extends ZooKeeperListener {
* this node explicitly. If we crash before then, ZooKeeper will delete
* this node for us since it is ephemeral.
*/
- LOG.info("Adding ZNode for " + backupZNode +
- " in backup master directory");
- ZKUtil.createEphemeralNodeAndWatch(this.watcher, backupZNode,
- this.sn.getVersionedBytes());
+ LOG.info("Adding ZNode for " + backupZNode + " in backup master directory");
+ MasterAddressTracker.setMasterAddress(this.watcher, backupZNode, this.sn);
String msg;
byte[] bytes =
- ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode);
+ ZKUtil.getDataAndWatch(this.watcher, this.watcher.getMasterAddressZNode());
if (bytes == null) {
msg = ("A master was detected, but went down before its address " +
"could be read. Attempting to become the next active master");
@@ -182,7 +179,7 @@ class ActiveMasterManager extends ZooKeeperListener {
msg = ("Current master has this master's address, " +
currentMaster + "; master was restarted? Deleting node.");
// Hurry along the expiration of the znode.
- ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
+ ZKUtil.deleteNode(this.watcher, this.watcher.getMasterAddressZNode());
} else {
msg = "Another master is the active master, " + currentMaster +
"; waiting to become the next active master";
@@ -221,10 +218,10 @@ class ActiveMasterManager extends ZooKeeperListener {
*/
public boolean isActiveMaster() {
try {
- if (ZKUtil.checkExists(watcher, watcher.masterAddressZNode) >= 0) {
+ if (ZKUtil.checkExists(watcher, watcher.getMasterAddressZNode()) >= 0) {
return true;
}
- }
+ }
catch (KeeperException ke) {
LOG.info("Received an unexpected KeeperException when checking " +
"isActiveMaster : "+ ke);
@@ -235,12 +232,14 @@ class ActiveMasterManager extends ZooKeeperListener {
public void stop() {
try {
// If our address is in ZK, delete it on our way out
- byte [] bytes =
- ZKUtil.getDataAndWatch(watcher, watcher.masterAddressZNode);
- // TODO: redo this to make it atomic (only added for tests)
- ServerName master = ServerName.parseVersionedServerName(bytes);
- if (master != null && master.equals(this.sn)) {
- ZKUtil.deleteNode(watcher, watcher.masterAddressZNode);
+ ServerName activeMaster = null;
+ try {
+ activeMaster = MasterAddressTracker.getMasterAddress(this.watcher);
+ } catch (IOException e) {
+ LOG.warn("Failed get of master address: " + e.toString());
+ }
+ if (activeMaster != null && activeMaster.equals(this.sn)) {
+ ZKUtil.deleteNode(watcher, watcher.getMasterAddressZNode());
}
} catch (KeeperException e) {
LOG.error(this.watcher.prefix("Error deleting our own master address node"), e);
diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 26e9552..6f2f1d2 100644
--- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -312,6 +312,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
return new Pair(pending, hris.size());
}
+
/**
* Reset all unassigned znodes. Called on startup of master.
* Call {@link #assignAllUserRegions()} after root and meta have been assigned.
diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index daf3b07..fcf0ac7 100644
--- src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
@@ -91,6 +90,7 @@ import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableEventHandler;
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
+import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -184,10 +183,7 @@ Server {
private CatalogTracker catalogTracker;
// Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker;
-
- // Schema change tracker
- private MasterSchemaChangeTracker schemaChangeTracker;
-
+
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
// operations/debugging.
@@ -215,18 +211,12 @@ Server {
private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner;
- private Thread schemaJanitorChore;
private MasterCoprocessorHost cpHost;
private final ServerName serverName;
private TableDescriptors tableDescriptors;
- // Whether or not schema alter changes go through ZK or not.
- private boolean supportInstantSchemaChanges = false;
-
- private volatile boolean loadBalancerRunning = false;
-
// Time stamps for when a hmaster was started and when it became active
private long masterStartTime;
private long masterActiveTime;
@@ -300,18 +290,6 @@ Server {
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
this.rpcServer.startThreads();
this.metrics = new MasterMetrics(getServerName().toString());
- this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf);
- }
-
- /**
- * Get whether instant schema change is on or not.
- * @param c
- * @return True if instant schema enabled.
- */
- private boolean getSupportInstantSchemaChanges(final Configuration c) {
- boolean b = c.getBoolean("hbase.instant.schema.alter.enabled", false);
- LOG.debug("Instant schema change enabled=" + b + ".");
- return b;
}
/**
@@ -451,12 +429,6 @@ Server {
boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp();
- // initialize schema change tracker
- this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(),
- this, this,
- conf.getInt("hbase.instant.schema.alter.timeout", 60000));
- this.schemaChangeTracker.start();
-
LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
@@ -596,9 +568,6 @@ Server {
this.catalogJanitorChore = new CatalogJanitor(this, this);
Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
- // Schema janitor chore.
- this.schemaJanitorChore = getAndStartSchemaJanitorChore(this);
-
registerMBean();
status.markComplete("Initialization successful");
@@ -811,15 +780,6 @@ Server {
return this.tableDescriptors;
}
- @Override
- public MasterSchemaChangeTracker getSchemaChangeTracker() {
- return this.schemaChangeTracker;
- }
-
- public RegionServerTracker getRegionServerTracker() {
- return this.regionServerTracker;
- }
-
/** @return InfoServer object. Maybe null.*/
public InfoServer getInfoServer() {
return this.infoServer;
@@ -931,28 +891,7 @@ Server {
if (this.executorService != null) this.executorService.shutdown();
}
- /**
- * Start the schema janitor. This Janitor will periodically sweep the failed/expired schema
- * changes.
- * @param master
- * @return
- */
- private Thread getAndStartSchemaJanitorChore(final HMaster master) {
- String name = master.getServerName() + "-SchemaJanitorChore";
- int schemaJanitorPeriod =
- master.getConfiguration().getInt("hbase.instant.schema.janitor.period", 120000);
- // Start up the schema janitor chore
- Chore chore = new Chore(name, schemaJanitorPeriod, master) {
- @Override
- protected void chore() {
- master.getSchemaChangeTracker().handleFailedOrExpiredSchemaChanges();
- }
- };
- return Threads.setDaemonThreadRunning(chore.getThread());
- }
-
-
- private Thread getAndStartBalancerChore(final HMaster master) {
+ private static Thread getAndStartBalancerChore(final HMaster master) {
String name = master.getServerName() + "-BalancerChore";
int balancerPeriod =
master.getConfiguration().getInt("hbase.balancer.period", 300000);
@@ -973,10 +912,6 @@ Server {
if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt();
}
- if (this.schemaJanitorChore != null) {
- this.schemaJanitorChore.interrupt();
- }
-
}
@Override
@@ -1058,15 +993,6 @@ Server {
return balancerCutoffTime;
}
-
- /**
- * Check whether the Load Balancer is currently running.
- * @return true if the Load balancer is currently running.
- */
- public boolean isLoadBalancerRunning() {
- return loadBalancerRunning;
- }
-
@Override
public boolean balance() {
// If balance not true, don't run balancer.
@@ -1074,33 +1000,23 @@ Server {
// Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime();
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
- boolean balancerRan = false;
+ boolean balancerRan;
synchronized (this.balancer) {
- if (loadBalancerRunning) {
- LOG.debug("Load balancer is currently running. Skipping the current execution.");
- return false;
- }
-
// Only allow one balance run at at time.
if (this.assignmentManager.isRegionsInTransition()) {
LOG.debug("Not running balancer because " +
- this.assignmentManager.getRegionsInTransition().size() +
- " region(s) in transition: " +
- org.apache.commons.lang.StringUtils.
+ this.assignmentManager.getRegionsInTransition().size() +
+ " region(s) in transition: " +
+ org.apache.commons.lang.StringUtils.
abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
return false;
}
if (this.serverManager.areDeadServersInProgress()) {
LOG.debug("Not running balancer because processing dead regionserver(s): " +
- this.serverManager.getDeadServers());
- return false;
- }
- if (schemaChangeTracker.isSchemaChangeInProgress()) {
- LOG.debug("Schema change operation is in progress. Waiting for " +
- "it to complete before running the load balancer.");
+ this.serverManager.getDeadServers());
return false;
}
- loadBalancerRunning = true;
+
if (this.cpHost != null) {
try {
if (this.cpHost.preBalance()) {
@@ -1135,7 +1051,7 @@ Server {
// if performing next balance exceeds cutoff time, exit the loop
(System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
- maximumBalanceTime);
+ maximumBalanceTime);
break;
}
}
@@ -1148,7 +1064,6 @@ Server {
LOG.error("Error invoking master coprocessor postBalance()", ioe);
}
}
- loadBalancerRunning = false;
}
return balancerRan;
}
@@ -1298,9 +1213,7 @@ Server {
if (cpHost != null) {
cpHost.preDeleteTable(tableName);
}
- this.executorService.submit(new DeleteTableHandler(tableName, this, this, this,
- supportInstantSchemaChanges));
-
+ this.executorService.submit(new DeleteTableHandler(tableName, this, this));
if (cpHost != null) {
cpHost.postDeleteTable(tableName);
}
@@ -1312,6 +1225,7 @@ Server {
* @return Pair indicating the number of regions updated Pair.getFirst is the
* regions that are yet to be updated Pair.getSecond is the total number
* of regions of the table
+ * @throws IOException
*/
public Pair getAlterStatus(byte[] tableName)
throws IOException {
@@ -1319,44 +1233,9 @@ Server {
// may overlap with other table operations or the table operation may
// have completed before querying this API. We need to refactor to a
// transaction system in the future to avoid these ambiguities.
- if (supportInstantSchemaChanges) {
- return getAlterStatusFromSchemaChangeTracker(tableName);
- }
return this.assignmentManager.getReopenStatus(tableName);
}
- /**
- * Used by the client to identify if all regions have the schema updates
- *
- * @param tableName
- * @return Pair indicating the status of the alter command
- * @throws IOException
- */
- private Pair getAlterStatusFromSchemaChangeTracker(byte[] tableName)
- throws IOException {
- MasterSchemaChangeTracker.MasterAlterStatus alterStatus = null;
- try {
- alterStatus =
- this.schemaChangeTracker.getMasterAlterStatus(Bytes.toString(tableName));
- } catch (KeeperException ke) {
- LOG.error("KeeperException while getting schema alter status for table = "
- + Bytes.toString(tableName), ke);
- }
- if (alterStatus != null) {
- LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = "
- + Bytes.toString(tableName) + " Alter Status = "
- + alterStatus.toString());
- return new Pair(alterStatus.getNumberOfRegionsProcessed(),
- alterStatus.getNumberOfRegionsToProcess());
- } else {
- LOG.debug("MasterAlterStatus is NULL for table = "
- + Bytes.toString(tableName));
- // should we throw IOException here as it makes more sense?
- return new Pair(0,0);
- }
- }
-
-
public void addColumn(byte [] tableName, HColumnDescriptor column)
throws IOException {
checkInitialized();
@@ -1365,8 +1244,7 @@ Server {
return;
}
}
- new TableAddFamilyHandler(tableName, column, this, this,
- this, supportInstantSchemaChanges).process();
+ new TableAddFamilyHandler(tableName, column, this, this).process();
if (cpHost != null) {
cpHost.postAddColumn(tableName, column);
}
@@ -1380,8 +1258,7 @@ Server {
return;
}
}
- new TableModifyFamilyHandler(tableName, descriptor, this, this,
- this, supportInstantSchemaChanges).process();
+ new TableModifyFamilyHandler(tableName, descriptor, this, this).process();
if (cpHost != null) {
cpHost.postModifyColumn(tableName, descriptor);
}
@@ -1395,8 +1272,7 @@ Server {
return;
}
}
- new TableDeleteFamilyHandler(tableName, c, this, this,
- this, supportInstantSchemaChanges).process();
+ new TableDeleteFamilyHandler(tableName, c, this, this).process();
if (cpHost != null) {
cpHost.postDeleteColumn(tableName, c);
}
@@ -1408,7 +1284,7 @@ Server {
cpHost.preEnableTable(tableName);
}
this.executorService.submit(new EnableTableHandler(this, tableName,
- catalogTracker, assignmentManager, false));
+ catalogTracker, assignmentManager, false));
if (cpHost != null) {
cpHost.postEnableTable(tableName);
@@ -1470,36 +1346,16 @@ Server {
if (cpHost != null) {
cpHost.preModifyTable(tableName, htd);
}
- TableEventHandler tblHandle = new ModifyTableHandler(tableName, htd, this,
- this, this, supportInstantSchemaChanges);
+ TableEventHandler tblHandle = new ModifyTableHandler(tableName, htd, this, this);
this.executorService.submit(tblHandle);
tblHandle.waitForPersist();
-
if (cpHost != null) {
cpHost.postModifyTable(tableName, htd);
}
}
- private boolean isOnlineSchemaChangeAllowed() {
- return conf.getBoolean(
- "hbase.online.schema.update.enable", false);
- }
-
@Override
- public void checkTableModifiable(final byte [] tableName,
- EventHandler.EventType eventType)
- throws IOException {
- preCheckTableModifiable(tableName);
- if (!eventType.isSchemaChangeEvent() ||
- !isOnlineSchemaChangeAllowed()) {
- if (!getAssignmentManager().getZKTable().
- isDisabledTable(Bytes.toString(tableName))) {
- throw new TableNotDisabledException(tableName);
- }
- }
- }
-
- private void preCheckTableModifiable(final byte[] tableName)
+ public void checkTableModifiable(final byte [] tableName)
throws IOException {
String tableNameStr = Bytes.toString(tableName);
if (isCatalogTable(tableName)) {
@@ -1508,6 +1364,10 @@ Server {
if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
throw new TableNotFoundException(tableNameStr);
}
+ if (!getAssignmentManager().getZKTable().
+ isDisabledTable(Bytes.toString(tableName))) {
+ throw new TableNotDisabledException(tableName);
+ }
}
public void clearFromTransition(HRegionInfo hri) {
diff --git src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 8a7da2e..befc764 100644
--- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -514,7 +514,7 @@ public class MasterFileSystem {
*/
public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd)
throws IOException {
- LOG.debug("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " +
+ LOG.info("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " +
hcd.toString());
HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
if (htd == null) {
diff --git src/main/java/org/apache/hadoop/hbase/master/MasterServices.java src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index e1e6685..989e675 100644
--- src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
/**
@@ -56,15 +55,12 @@ public interface MasterServices extends Server {
public ExecutorService getExecutorService();
/**
- * Check table modifiable. i.e not ROOT or META and offlined for all commands except
- * alter commands
- * @param tableName
- * @param eventType
- * @throws IOException
+ * Check table is modifiable; i.e. exists and is offline.
+ * @param tableName Name of table to check.
+ * @throws TableNotDisabledException
+ * @throws TableNotFoundException
*/
- public void checkTableModifiable(final byte [] tableName,
- EventHandler.EventType eventType)
- throws IOException;
+ public void checkTableModifiable(final byte [] tableName) throws IOException;
/**
* Create a table using the given table definition.
@@ -81,20 +77,7 @@ public interface MasterServices extends Server {
public TableDescriptors getTableDescriptors();
/**
- * Get Master Schema change tracker
- * @return
- */
- public MasterSchemaChangeTracker getSchemaChangeTracker();
-
- /**
- * Return the Region server tracker.
- * @return RegionServerTracker
- */
- public RegionServerTracker getRegionServerTracker();
-
- /**
* @return true if master enables ServerShutdownHandler;
*/
public boolean isServerShutdownHandlerEnabled();
-
}
diff --git src/main/java/org/apache/hadoop/hbase/master/ServerManager.java src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 3515d4a..1c253a0 100644
--- src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -345,15 +345,6 @@ public class ServerManager {
}
}
- /**
- * Exclude a RS from any pending schema change process.
- * @param serverName
- */
- private void excludeRegionServerFromSchemaChanges(final ServerName serverName) {
- this.services.getSchemaChangeTracker()
- .excludeRegionServerForSchemaChanges(serverName.getServerName());
- }
-
/*
* Expire the passed server. Add it to list of deadservers and queue a
* shutdown processing.
@@ -365,7 +356,6 @@ public class ServerManager {
this.deadNotExpiredServers.add(serverName);
return;
}
- excludeRegionServerFromSchemaChanges(serverName);
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Received expiration of " + serverName +
" but server is not currently online");
diff --git src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
index 02bec37..bb3d5cc 100644
--- src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
+++ src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.MetaEditor;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -40,11 +39,9 @@ public class DeleteTableHandler extends TableEventHandler {
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
public DeleteTableHandler(byte [] tableName, Server server,
- final MasterServices masterServices, HMasterInterface masterInterface,
- boolean instantChange)
+ final MasterServices masterServices)
throws IOException {
- super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices,
- masterInterface, instantChange);
+ super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices);
// The next call fails if no such table.
getTableDescriptor();
}
diff --git src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
index 4a735ec..c11238e 100644
--- src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
+++ src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices;
@InterfaceAudience.Private
@@ -35,11 +34,9 @@ public class ModifyTableHandler extends TableEventHandler {
public ModifyTableHandler(final byte [] tableName,
final HTableDescriptor htd, final Server server,
- final MasterServices masterServices, final HMasterInterface masterInterface,
- boolean instantModify)
+ final MasterServices masterServices)
throws IOException {
- super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices,
- masterInterface, instantModify);
+ super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices);
// Check table exists.
getTableDescriptor();
// This is the new schema we are going to write out as this modification.
diff --git src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
index 4ec3d68..ec726e9 100644
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices;
/**
@@ -40,10 +39,8 @@ public class TableAddFamilyHandler extends TableEventHandler {
private final HColumnDescriptor familyDesc;
public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc,
- Server server, final MasterServices masterServices,
- HMasterInterface masterInterface, boolean instantChange) throws IOException {
- super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
- masterInterface, instantChange);
+ Server server, final MasterServices masterServices) throws IOException {
+ super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
HTableDescriptor htd = getTableDescriptor();
if (htd.hasFamily(familyDesc.getName())) {
throw new InvalidFamilyOperationException("Family '" +
diff --git src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
index bfa624b..0fb20b6 100644
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -39,10 +38,8 @@ public class TableDeleteFamilyHandler extends TableEventHandler {
private final byte [] familyName;
public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName,
- Server server, final MasterServices masterServices,
- HMasterInterface masterInterface, boolean instantChange) throws IOException {
- super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
- masterInterface, instantChange);
+ Server server, final MasterServices masterServices) throws IOException {
+ super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
HTableDescriptor htd = getTableDescriptor();
this.familyName = hasColumnFamily(htd, familyName);
}
diff --git src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
index 6d173bb..906f4de 100644
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
@@ -37,19 +37,13 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.Lists;
@@ -66,23 +60,33 @@ import com.google.common.collect.Maps;
public abstract class TableEventHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(TableEventHandler.class);
protected final MasterServices masterServices;
- protected HMasterInterface master = null;
protected final byte [] tableName;
protected final String tableNameStr;
- protected boolean instantAction = false;
protected boolean persistedToZk = false;
public TableEventHandler(EventType eventType, byte [] tableName, Server server,
- MasterServices masterServices, HMasterInterface masterInterface,
- boolean instantSchemaChange)
+ MasterServices masterServices)
throws IOException {
super(server, eventType);
this.masterServices = masterServices;
this.tableName = tableName;
- this.masterServices.checkTableModifiable(tableName, eventType);
+ try {
+ this.masterServices.checkTableModifiable(tableName);
+ } catch (TableNotDisabledException ex) {
+ if (isOnlineSchemaChangeAllowed()
+ && eventType.isOnlineSchemaChangeSupported()) {
+ LOG.debug("Ignoring table not disabled exception " +
+ "for supporting online schema changes.");
+ } else {
+ throw ex;
+ }
+ }
this.tableNameStr = Bytes.toString(this.tableName);
- this.instantAction = instantSchemaChange;
- this.master = masterInterface;
+ }
+
+ private boolean isOnlineSchemaChangeAllowed() {
+ return this.server.getConfiguration().getBoolean(
+ "hbase.online.schema.update.enable", false);
}
@Override
@@ -94,7 +98,16 @@ public abstract class TableEventHandler extends EventHandler {
MetaReader.getTableRegions(this.server.getCatalogTracker(),
tableName);
handleTableOperation(hris);
- handleSchemaChanges(hris);
+ if (eventType.isOnlineSchemaChangeSupported() && this.masterServices.
+ getAssignmentManager().getZKTable().
+ isEnabledTable(Bytes.toString(tableName))) {
+ if (reOpenAllRegions(hris)) {
+ LOG.info("Completed table operation " + eventType + " on table " +
+ Bytes.toString(tableName));
+ } else {
+ LOG.warn("Error on reopening the regions");
+ }
+ }
} catch (IOException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} catch (KeeperException e) {
@@ -105,48 +118,13 @@ public abstract class TableEventHandler extends EventHandler {
}
}
- private void handleSchemaChanges(List regions)
- throws IOException {
- if (instantAction && regions != null && !regions.isEmpty()) {
- handleInstantSchemaChanges(regions);
- } else {
- handleRegularSchemaChanges(regions);
- }
- }
-
-
- /**
- * Perform schema changes only if the table is in enabled state.
- * @return
- */
- private boolean canPerformSchemaChange() {
- return (eventType.isSchemaChangeEvent() && this.masterServices.
- getAssignmentManager().getZKTable().
- isEnabledTable(Bytes.toString(tableName)));
- }
-
- private void handleRegularSchemaChanges(List regions)
- throws IOException {
- if (canPerformSchemaChange()) {
- this.masterServices.getAssignmentManager().setRegionsToReopen(regions);
- setPersist();
- if (reOpenAllRegions(regions)) {
- LOG.info("Completed table operation " + eventType + " on table " +
- Bytes.toString(tableName));
- } else {
- LOG.warn("Error on reopening the regions");
- }
- }
- }
-
public boolean reOpenAllRegions(List regions) throws IOException {
boolean done = false;
LOG.info("Bucketing regions by region server...");
HTable table = new HTable(masterServices.getConfiguration(), tableName);
TreeMap> serverToRegions = Maps
.newTreeMap();
- NavigableMap hriHserverMapping
- = table.getRegionLocations();
+ NavigableMap hriHserverMapping = table.getRegionLocations();
List reRegions = new ArrayList();
for (HRegionInfo hri : regions) {
ServerName rsLocation = hriHserverMapping.get(hri);
@@ -189,32 +167,6 @@ public abstract class TableEventHandler extends EventHandler {
}
/**
- * Check whether any of the regions from the list of regions is undergoing a split.
- * We simply check whether there is a unassigned node for any of the region and if so
- * we return as true.
- * @param regionInfos
- * @return
- */
- private boolean isSplitInProgress(List regionInfos) {
- for (HRegionInfo hri : regionInfos) {
- ZooKeeperWatcher zkw = this.masterServices.getZooKeeper();
- String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
- try {
- if (ZKUtil.checkExists(zkw, node) != -1) {
- LOG.debug("Region " + hri.getRegionNameAsString() + " is unassigned. Assuming" +
- " that it is undergoing a split");
- return true;
- }
- } catch (KeeperException ke) {
- LOG.debug("KeeperException while determining splits in progress.", ke);
- // Assume no splits happening?
- return false;
- }
- }
- return false;
- }
-
- /**
* Table modifications are processed asynchronously, but provide an API for
* you to query their status.
*
@@ -239,65 +191,6 @@ public abstract class TableEventHandler extends EventHandler {
}
/**
- * Wait for region split transaction in progress (if any)
- * @param regions
- * @param status
- */
- private void waitForInflightSplit(List regions, MonitoredTask status) {
- while (isSplitInProgress(regions)) {
- try {
- status.setStatus("Alter Schema is waiting for split region to complete.");
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- protected void handleInstantSchemaChanges(List regions) {
- if (regions == null || regions.isEmpty()) {
- LOG.debug("Region size is null or empty. Ignoring alter request.");
- return;
- }
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Handling alter table request for table = " + tableNameStr);
- if (canPerformSchemaChange()) {
- boolean prevBalanceSwitch = false;
- try {
- // turn off load balancer synchronously
- prevBalanceSwitch = master.synchronousBalanceSwitch(false);
- waitForInflightSplit(regions, status);
- MasterSchemaChangeTracker masterSchemaChangeTracker =
- this.masterServices.getSchemaChangeTracker();
- masterSchemaChangeTracker
- .createSchemaChangeNode(Bytes.toString(tableName),
- regions.size());
- while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists(
- Bytes.toString(tableName))) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- status.markComplete("Created ZK node for handling the alter table request for table = "
- + tableNameStr);
- } catch (KeeperException e) {
- LOG.warn("Instant schema change failed for table " + tableNameStr, e);
- status.setStatus("Instant schema change failed for table " + tableNameStr
- + " Cause = " + e.getCause());
-
- } catch (IOException ioe) {
- LOG.warn("Instant schema change failed for table " + tableNameStr, ioe);
- status.setStatus("Instant schema change failed for table " + tableNameStr
- + " Cause = " + ioe.getCause());
- } finally {
- master.synchronousBalanceSwitch(prevBalanceSwitch);
- }
- }
- }
-
- /**
* @return Table descriptor for this table
* @throws TableExistsException
* @throws FileNotFoundException
diff --git src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
index b4f8cd4..e1868a2 100644
--- src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
+++ src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
@@ -41,10 +40,8 @@ public class TableModifyFamilyHandler extends TableEventHandler {
public TableModifyFamilyHandler(byte[] tableName,
HColumnDescriptor familyDesc, Server server,
- final MasterServices masterServices,
- HMasterInterface masterInterface, boolean instantChange) throws IOException {
- super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices,
- masterInterface, instantChange);
+ final MasterServices masterServices) throws IOException {
+ super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices);
HTableDescriptor htd = getTableDescriptor();
hasColumnFamily(htd, familyDesc.getName());
this.familyDesc = familyDesc;
diff --git src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 8ff87fe..7304a9e 100644
--- src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -481,11 +481,489 @@ public final class ZooKeeperProtos {
// @@protoc_insertion_point(class_scope:RootRegionServer)
}
+ public interface MasterOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required .ServerName master = 1;
+ boolean hasMaster();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getMaster();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getMasterOrBuilder();
+ }
+ public static final class Master extends
+ com.google.protobuf.GeneratedMessage
+ implements MasterOrBuilder {
+ // Use Master.newBuilder() to construct.
+ private Master(Builder builder) {
+ super(builder);
+ }
+ private Master(boolean noInit) {}
+
+ private static final Master defaultInstance;
+ public static Master getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public Master getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_Master_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_Master_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required .ServerName master = 1;
+ public static final int MASTER_FIELD_NUMBER = 1;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName master_;
+ public boolean hasMaster() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getMaster() {
+ return master_;
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getMasterOrBuilder() {
+ return master_;
+ }
+
+ private void initFields() {
+ master_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasMaster()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getMaster().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeMessage(1, master_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(1, master_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master) obj;
+
+ boolean result = true;
+ result = result && (hasMaster() == other.hasMaster());
+ if (hasMaster()) {
+ result = result && getMaster()
+ .equals(other.getMaster());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasMaster()) {
+ hash = (37 * hash) + MASTER_FIELD_NUMBER;
+ hash = (53 * hash) + getMaster().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.MasterOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_Master_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_Master_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getMasterFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ if (masterBuilder_ == null) {
+ master_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ } else {
+ masterBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master build() {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ if (masterBuilder_ == null) {
+ result.master_ = master_;
+ } else {
+ result.master_ = masterBuilder_.build();
+ }
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master.getDefaultInstance()) return this;
+ if (other.hasMaster()) {
+ mergeMaster(other.getMaster());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasMaster()) {
+
+ return false;
+ }
+ if (!getMaster().isInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 10: {
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
+ if (hasMaster()) {
+ subBuilder.mergeFrom(getMaster());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setMaster(subBuilder.buildPartial());
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required .ServerName master = 1;
+ private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName master_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> masterBuilder_;
+ public boolean hasMaster() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getMaster() {
+ if (masterBuilder_ == null) {
+ return master_;
+ } else {
+ return masterBuilder_.getMessage();
+ }
+ }
+ public Builder setMaster(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
+ if (masterBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ master_ = value;
+ onChanged();
+ } else {
+ masterBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder setMaster(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) {
+ if (masterBuilder_ == null) {
+ master_ = builderForValue.build();
+ onChanged();
+ } else {
+ masterBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder mergeMaster(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
+ if (masterBuilder_ == null) {
+ if (((bitField0_ & 0x00000001) == 0x00000001) &&
+ master_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
+ master_ =
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(master_).mergeFrom(value).buildPartial();
+ } else {
+ master_ = value;
+ }
+ onChanged();
+ } else {
+ masterBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ public Builder clearMaster() {
+ if (masterBuilder_ == null) {
+ master_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ onChanged();
+ } else {
+ masterBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getMasterBuilder() {
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return getMasterFieldBuilder().getBuilder();
+ }
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getMasterOrBuilder() {
+ if (masterBuilder_ != null) {
+ return masterBuilder_.getMessageOrBuilder();
+ } else {
+ return master_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>
+ getMasterFieldBuilder() {
+ if (masterBuilder_ == null) {
+ masterBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>(
+ master_,
+ getParentForChildren(),
+ isClean());
+ master_ = null;
+ }
+ return masterBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:Master)
+ }
+
+ static {
+ defaultInstance = new Master(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:Master)
+ }
+
private static com.google.protobuf.Descriptors.Descriptor
internal_static_RootRegionServer_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_RootRegionServer_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_Master_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_Master_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -496,9 +974,10 @@ public final class ZooKeeperProtos {
static {
java.lang.String[] descriptorData = {
"\n\017ZooKeeper.proto\032\013hbase.proto\"/\n\020RootRe" +
- "gionServer\022\033\n\006server\030\001 \002(\0132\013.ServerNameB" +
- "E\n*org.apache.hadoop.hbase.protobuf.gene" +
- "ratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
+ "gionServer\022\033\n\006server\030\001 \002(\0132\013.ServerName\"" +
+ "%\n\006Master\022\033\n\006master\030\001 \002(\0132\013.ServerNameBE" +
+ "\n*org.apache.hadoop.hbase.protobuf.gener" +
+ "atedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -513,6 +992,14 @@ public final class ZooKeeperProtos {
new java.lang.String[] { "Server", },
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer.class,
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer.Builder.class);
+ internal_static_Master_descriptor =
+ getDescriptor().getMessageTypes().get(1);
+ internal_static_Master_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_Master_descriptor,
+ new java.lang.String[] { "Master", },
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master.class,
+ org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Master.Builder.class);
return null;
}
};
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index 2913c2b..1081fc6 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -157,29 +157,12 @@ public class CompactSplitThread implements CompactionRequestor {
return false;
}
- /**
- * Wait for mid-flight schema alter requests. (if any). We don't want to execute a split
- * when a schema alter is in progress as we end up in an inconsistent state.
- * @param tableName
- */
- private void waitForInflightSchemaChange(String tableName) {
- while (this.server.getSchemaChangeTracker()
- .isSchemaChangeInProgress(tableName)) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
public synchronized void requestSplit(final HRegion r, byte[] midKey) {
if (midKey == null) {
LOG.debug("Region " + r.getRegionNameAsString() +
" not splittable because midkey=null");
return;
}
- waitForInflightSchemaChange(r.getRegionInfo().getTableNameAsString());
try {
this.splits.execute(new SplitRequest(r, midKey, this.server));
if (LOG.isDebugEnabled()) {
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4f80999..8a61f7d 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterAddressTracker;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
@@ -147,8 +146,8 @@ import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
-import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -294,9 +293,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Cluster Status Tracker
private ClusterStatusTracker clusterStatusTracker;
- // Schema change Tracker
- private SchemaChangeTracker schemaChangeTracker;
-
// Log Splitting Worker
private SplitLogWorker splitLogWorker;
@@ -599,11 +595,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start();
-
- // Schema change tracker
- this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper,
- this, this);
- this.schemaChangeTracker.start();
}
/**
@@ -2901,26 +2892,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
splitRegion(regionInfo, null);
}
- /**
- * Wait for mid-flight schema change requests. (if any)
- * @param tableName
- */
- private void waitForSchemaChange(String tableName) {
- while (schemaChangeTracker.isSchemaChangeInProgress(tableName)) {
- try {
- LOG.debug("Schema alter is inprogress for table = " + tableName
- + " Waiting for alter to complete before a split");
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
@Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException {
- waitForSchemaChange(Bytes.toString(regionInfo.getTableName()));
checkOpen();
HRegion region = getRegion(regionInfo.getRegionName());
region.flushcache();
@@ -3672,58 +3646,27 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
}
/**
- * Refresh schema changes for given region.
- * @param hRegion HRegion to refresh
- * @throws IOException
- */
- public void refreshRegion(HRegion hRegion) throws IOException {
-
- if (hRegion != null) {
+ * Gets the online regions of the specified table.
+ * This method looks at the in-memory onlineRegions. It does not go to .META..
+ * Only returns online regions. If a region on this table has been
+ * closed during a disable, etc., it will not be included in the returned list.
+ * So, the returned list may not necessarily be ALL regions in this table, its
+ * all the ONLINE regions in the table.
+ * @param tableName
+ * @return Online regions from tableName
+ */
+ public List getOnlineRegions(byte[] tableName) {
+ List tableRegions = new ArrayList();
synchronized (this.onlineRegions) {
- HRegionInfo regionInfo = hRegion.getRegionInfo();
- // Close the region
- hRegion.close();
- // Remove from online regions
- removeFromOnlineRegions(regionInfo.getEncodedName());
- // Get new HTD
- HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName());
- LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString()
- + " Is = " + htd );
- HRegion region =
- HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf,
- this, null);
- // Add new region to the onlineRegions
- addToOnlineRegions(region);
+ for (HRegion region: this.onlineRegions.values()) {
+ HRegionInfo regionInfo = region.getRegionInfo();
+ if(Bytes.equals(regionInfo.getTableName(), tableName)) {
+ tableRegions.add(region);
+ }
+ }
}
+ return tableRegions;
}
- }
-
- /**
- * Gets the online regions of the specified table.
- * This method looks at the in-memory onlineRegions. It does not go to .META..
- * Only returns online regions. If a region on this table has been
- * closed during a disable, etc., it will not be included in the returned list.
- * So, the returned list may not necessarily be ALL regions in this table, its
- * all the ONLINE regions in the table.
- * @param tableName
- * @return Online regions from tableName
- */
- public List getOnlineRegions(byte[] tableName) {
- List tableRegions = new ArrayList();
- synchronized (this.onlineRegions) {
- for (HRegion region: this.onlineRegions.values()) {
- HRegionInfo regionInfo = region.getRegionInfo();
- if(Bytes.equals(regionInfo.getTableName(), tableName)) {
- tableRegions.add(region);
- }
- }
- }
- return tableRegions;
- }
-
- public SchemaChangeTracker getSchemaChangeTracker() {
- return this.schemaChangeTracker;
- }
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() {
@@ -3741,5 +3684,4 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
mxBeanInfo);
LOG.info("Registered RegionServer MXBean");
}
-
-}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
index da9e5cf..b038ef3 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
@@ -19,12 +19,12 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Server;
-
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Server;
+
/**
* Interface to Map of online regions. In the Map, the key is the region's
* encoded name and the value is an {@link HRegion} instance.
@@ -54,18 +54,12 @@ interface OnlineRegions extends Server {
* null if named region is not member of the online regions.
*/
public HRegion getFromOnlineRegions(String encodedRegionName);
- /**
- * Get all online regions of a table in this RS.
- * @param tableName
- * @return List of HRegion
- * @throws java.io.IOException
- */
- public List getOnlineRegions(byte[] tableName) throws IOException;
-
- /**
- * Refresh a given region updating it with latest HTD info.
- * @param hRegion
- */
- public void refreshRegion(HRegion hRegion) throws IOException;
-}
+ /**
+ * Get all online regions of a table in this RS.
+ * @param tableName
+ * @return List of HRegion
+ * @throws java.io.IOException
+ */
+ public List getOnlineRegions(byte[] tableName) throws IOException;
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
new file mode 100644
index 0000000..8ba749b
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/MasterAddressTracker.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Manages the location of the current active Master for the RegionServer.
+ *
+ * Listens for ZooKeeper events related to the master address. The node
+ * /master will contain the address of the current master.
+ * This listener is interested in
+ * NodeDeleted and NodeCreated events on
+ * /master.
+ *
+ * Utilizes {@link ZooKeeperNodeTracker} for zk interactions.
+ *
+ * You can get the current master via {@link #getMasterAddress()} or via
+ * {@link #getMasterAddress(ZooKeeperWatcher)} if you do not have a running
+ * instance of this Tracker in your context.
+ *
+ * This class also includes utility for interacting with the master znode, for
+ * writing and reading the znode content.
+ */
+@InterfaceAudience.Private
+public class MasterAddressTracker extends ZooKeeperNodeTracker {
+ /**
+ * Construct a master address listener with the specified
+ * zookeeper reference.
+ *
+ * This constructor does not trigger any actions, you must call methods
+ * explicitly. Normally you will just want to execute {@link #start()} to
+ * begin tracking of the master address.
+ *
+ * @param watcher zk reference and watcher
+ * @param abortable abortable in case of fatal error
+ */
+ public MasterAddressTracker(ZooKeeperWatcher watcher, Abortable abortable) {
+ super(watcher, watcher.getMasterAddressZNode(), abortable);
+ }
+
+ /**
+ * Get the address of the current master if one is available. Returns null
+ * if no current master.
+ * @return Server name or null if timed out.
+ */
+ public ServerName getMasterAddress() {
+ return bytesToServerName(super.getData(false));
+ }
+
+ /**
+ * Get master address.
+ * Use this instead of {@link #getMasterAddress()} if you do not have an
+ * instance of this tracker in your context.
+ * @param zkw ZooKeeperWatcher to use
+ * @return ServerName stored in the the master address znode or null if no
+ * znode present.
+ * @throws KeeperException
+ * @throws IOException
+ */
+ public static ServerName getMasterAddress(final ZooKeeperWatcher zkw)
+ throws KeeperException, IOException {
+ byte [] data = ZKUtil.getData(zkw, zkw.getMasterAddressZNode());
+ if (data == null){
+ throw new IOException("Can't get master address from ZooKeeper; znode data == null");
+ }
+ return bytesToServerName(data);
+ }
+
+ /**
+ * Set master address into the master znode or into the backup
+ * subdirectory of backup masters; switch off the passed in znode
+ * path.
+ * @param zkw The ZooKeeperWatcher to use.
+ * @param znode Where to create the znode; could be at the top level or it
+ * could be under backup masters
+ * @param master ServerName of the current master
+ * @return true if node created, false if not; a watch is set in both cases
+ * @throws KeeperException
+ */
+ public static boolean setMasterAddress(final ZooKeeperWatcher zkw,
+ final String znode, final ServerName master)
+ throws KeeperException {
+ return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, getZNodeData(master));
+ }
+
+ /**
+ * Check if there is a master available.
+ * @return true if there is a master set, false if not.
+ */
+ public boolean hasMaster() {
+ return super.getData(false) != null;
+ }
+
+ /**
+ * @param bytes Byte array of znode content
+ * @return A {@link ServerName} instance.
+ */
+ private static ServerName bytesToServerName(final byte [] bytes) {
+ return bytes == null ? null: ZKUtil.dataToServerName(bytes);
+ }
+
+ /**
+ * @param sn
+ * @return Content of the master znode as a serialized pb with the pb
+ * magic as prefix.
+ */
+ static byte [] getZNodeData(final ServerName sn) {
+ ZooKeeperProtos.Master.Builder mbuilder = ZooKeeperProtos.Master.newBuilder();
+ HBaseProtos.ServerName.Builder snbuilder = HBaseProtos.ServerName.newBuilder();
+ snbuilder.setHostName(sn.getHostname());
+ snbuilder.setPort(sn.getPort());
+ snbuilder.setStartCode(sn.getStartcode());
+ mbuilder.setMaster(snbuilder.build());
+ return ProtobufUtil.prependPBMagic(mbuilder.build().toByteArray());
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java
deleted file mode 100644
index 6c723b4..0000000
--- src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java
+++ /dev/null
@@ -1,828 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.Writable;
-import org.apache.zookeeper.KeeperException;
-
-@InterfaceAudience.Private
-public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker {
- public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class);
- private final MasterServices masterServices;
- // Used by tests only. Do not change this.
- private volatile int sleepTimeMillis = 0;
- // schema changes pending more than this time will be timed out.
- private long schemaChangeTimeoutMillis = 30000;
-
- /**
- * Constructs a new ZK node tracker.
- *
- * After construction, use {@link #start} to kick off tracking.
- *
- * @param watcher
- * @param abortable
- */
- public MasterSchemaChangeTracker(ZooKeeperWatcher watcher,
- Abortable abortable, MasterServices masterServices,
- long schemaChangeTimeoutMillis) {
- super(watcher, watcher.schemaZNode, abortable);
- this.masterServices = masterServices;
- this.schemaChangeTimeoutMillis = schemaChangeTimeoutMillis;
- }
-
- @Override
- public void start() {
- try {
- watcher.registerListener(this);
- List tables =
- ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
- processCompletedSchemaChanges(tables);
- } catch (KeeperException e) {
- LOG.error("MasterSchemaChangeTracker startup failed.", e);
- abortable.abort("MasterSchemaChangeTracker startup failed", e);
- }
- }
-
- private List getCurrentTables() throws KeeperException {
- return
- ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
- }
-
- /**
- * When a primary master crashes and the secondary master takes over
- * mid-flight during an alter process, the secondary should cleanup any completed
- * schema changes not handled by the previous master.
- * @param tables
- * @throws KeeperException
- */
- private void processCompletedSchemaChanges(List tables)
- throws KeeperException {
- if (tables == null || tables.isEmpty()) {
- String msg = "No current schema change in progress. Skipping cleanup";
- LOG.debug(msg);
- return;
- }
- String msg = "Master seeing following tables undergoing schema change " +
- "process. Tables = " + tables;
- MonitoredTask status = TaskMonitor.get().createStatus(msg);
- LOG.debug(msg);
- for (String table : tables) {
- LOG.debug("Processing table = "+ table);
- status.setStatus("Processing table = "+ table);
- try {
- processTableNode(table);
- } catch (IOException e) {
- String errmsg = "IOException while processing completed schema changes."
- + " Cause = " + e.getCause();
- LOG.error(errmsg, e);
- status.setStatus(errmsg);
- }
- }
- }
-
- /**
- * Get current alter status for a table.
- * @param tableName
- * @return MasterAlterStatus
- * @throws KeeperException
- * @throws IOException
- */
- public MasterAlterStatus getMasterAlterStatus(String tableName)
- throws KeeperException, IOException {
- String path = getSchemaChangeNodePathForTable(tableName);
- byte[] state = ZKUtil.getData(watcher, path);
- if (state == null || state.length <= 0) {
- return null;
- }
- MasterAlterStatus mas = new MasterAlterStatus();
- Writables.getWritable(state, mas);
- return mas;
- }
-
- /**
- * Get RS specific alter status for a table & server
- * @param tableName
- * @param serverName
- * @return Region Server's Schema alter status
- * @throws KeeperException
- * @throws IOException
- */
- private SchemaChangeTracker.SchemaAlterStatus getRSSchemaAlterStatus(
- String tableName, String serverName)
- throws KeeperException, IOException {
- String childPath =
- getSchemaChangeNodePathForTableAndServer(tableName, serverName);
- byte[] childData = ZKUtil.getData(this.watcher, childPath);
- if (childData == null || childData.length <= 0) {
- return null;
- }
- SchemaChangeTracker.SchemaAlterStatus sas =
- new SchemaChangeTracker.SchemaAlterStatus();
- Writables.getWritable(childData, sas);
- LOG.debug("Schema Status data for server = " + serverName + " table = "
- + tableName + " == " + sas);
- return sas;
- }
-
- /**
- * Update the master's alter status based on all region server's response.
- * @param servers
- * @param tableName
- * @throws IOException
- */
- private void updateMasterAlterStatus(MasterAlterStatus mas,
- List servers, String tableName)
- throws IOException, KeeperException {
- for (String serverName : servers) {
- SchemaChangeTracker.SchemaAlterStatus sas =
- getRSSchemaAlterStatus(tableName, serverName);
- if (sas != null) {
- mas.update(sas);
- LOG.debug("processTableNodeWithState:Updated Master Alter Status = "
- + mas + " for server = " + serverName);
- } else {
- LOG.debug("SchemaAlterStatus is NULL for table = " + tableName);
- }
- }
- }
-
- /**
- * If schema alter is handled for this table, then delete all the ZK nodes
- * created for this table.
- * @param tableName
- * @throws KeeperException
- */
- private void processTableNode(String tableName) throws KeeperException,
- IOException {
- LOG.debug("processTableNodeWithState. TableName = " + tableName);
- List servers =
- ZKUtil.listChildrenAndWatchThem(watcher,
- getSchemaChangeNodePathForTable(tableName));
- MasterAlterStatus mas = getMasterAlterStatus(tableName);
- if (mas == null) {
- LOG.debug("MasterAlterStatus is NULL. Table = " + tableName);
- return;
- }
- updateMasterAlterStatus(mas, servers, tableName);
- LOG.debug("Current Alter status = " + mas);
- String nodePath = getSchemaChangeNodePathForTable(tableName);
- ZKUtil.updateExistingNodeData(this.watcher, nodePath,
- Writables.getBytes(mas), getZKNodeVersion(nodePath));
- processAlterStatus(mas, tableName, servers);
- }
-
- /**
- * Evaluate the master alter status and determine the current status.
- * @param alterStatus
- * @param tableName
- * @param servers
- * @param status
- */
- private void processAlterStatus(MasterAlterStatus alterStatus,
- String tableName, List servers)
- throws KeeperException {
- if (alterStatus.getNumberOfRegionsToProcess()
- == alterStatus.getNumberOfRegionsProcessed()) {
- // schema change completed.
- String msg = "All region servers have successfully processed the " +
- "schema changes for table = " + tableName
- + " . Deleting the schema change node for table = "
- + tableName + " Region servers processed the schema change" +
- " request = " + alterStatus.getProcessedHosts()
- + " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess()
- + " Processed regions = " + alterStatus.getNumberOfRegionsProcessed();
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Checking alter schema request status for table = " + tableName);
- status.markComplete(msg);
- LOG.debug(msg);
- cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName));
- } else {
- if (alterStatus.getErrorCause() != null
- && alterStatus.getErrorCause().trim().length() > 0) {
- String msg = "Alter schema change failed "
- + "for table = " + tableName + " Number of online regions = "
- + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
- + alterStatus.getNumberOfRegionsProcessed()
- + " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
- + servers
- + " Error Cause = " + alterStatus.getErrorCause();
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Checking alter schema request status for table = " + tableName);
- // we have errors.
- LOG.debug(msg);
- status.abort(msg);
- } else {
- String msg = "Not all region servers have processed the schema changes"
- + "for table = " + tableName + " Number of online regions = "
- + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
- + alterStatus.getNumberOfRegionsProcessed()
- + " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
- + servers + " Alter STate = "
- + alterStatus.getCurrentAlterStatus();
- LOG.debug(msg);
- // status.setStatus(msg);
- }
- }
- }
-
- /**
- * Check whether a in-flight schema change request has expired.
- * @param tableName
- * @return true is the schema change request expired.
- * @throws IOException
- */
- private boolean hasSchemaChangeExpiredFor(String tableName)
- throws IOException, KeeperException {
- MasterAlterStatus mas = getMasterAlterStatus(tableName);
- long createdTimeStamp = mas.getStamp();
- long duration = System.currentTimeMillis() - createdTimeStamp;
- LOG.debug("Created TimeStamp = " + createdTimeStamp
- + " duration = " + duration + " Table = " + tableName
- + " Master Alter Status = " + mas);
- return (duration > schemaChangeTimeoutMillis);
- }
-
- /**
- * Handle failed and expired schema changes. We simply delete all the
- * expired/failed schema change attempts. Why we should do this ?
- * 1) Keeping the failed/expired schema change nodes longer prohibits any
- * future schema changes for the table.
- * 2) Any lingering expired/failed schema change requests will prohibit the
- * load balancer from running.
- */
- public void handleFailedOrExpiredSchemaChanges() {
- try {
- List tables = getCurrentTables();
- for (String table : tables) {
- String statmsg = "Cleaning failed or expired schema change requests. " +
- "current tables undergoing " +
- "schema change process = " + tables;
- MonitoredTask status = TaskMonitor.get().createStatus(statmsg);
- LOG.debug(statmsg);
- if (hasSchemaChangeExpiredFor(table)) {
- // time out.. currently, we abandon the in-flight schema change due to
- // time out.
- // Here, there are couple of options to consider. One could be to
- // attempt a retry of the schema change and see if it succeeds, and
- // another could be to simply rollback the schema change effort and
- // see if it succeeds.
- String msg = "Schema change for table = " + table + " has expired."
- + " Schema change for this table has been in progress for " +
- + schemaChangeTimeoutMillis +
- "Deleting the node now.";
- LOG.debug(msg);
- ZKUtil.deleteNodeRecursively(this.watcher,
- getSchemaChangeNodePathForTable(table));
- } else {
- String msg = "Schema change request is in progress for " +
- " table = " + table;
- LOG.debug(msg);
- status.setStatus(msg);
- }
- }
- } catch (IOException e) {
- String msg = "IOException during handleFailedExpiredSchemaChanges."
- + e.getCause();
- LOG.error(msg, e);
- TaskMonitor.get().createStatus(msg);
- } catch (KeeperException ke) {
- String msg = "KeeperException during handleFailedExpiredSchemaChanges."
- + ke.getCause();
- LOG.error(msg, ke);
- TaskMonitor.get().createStatus(msg);
- }
- }
-
- /**
- * Clean the nodes of completed schema change table.
- * @param path
- * @throws KeeperException
- */
- private void cleanProcessedTableNode(String path) throws KeeperException {
- if (sleepTimeMillis > 0) {
- try {
- LOG.debug("Master schema change tracker sleeping for "
- + sleepTimeMillis);
- Thread.sleep(sleepTimeMillis);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- ZKUtil.deleteNodeRecursively(this.watcher, path);
- LOG.debug("Deleted all nodes for path " + path);
-
- }
-
- /**
- * Exclude a RS from schema change request (if applicable)
- * We will exclude a RS from schema change request processing if 1) RS
- * has online regions for the table AND 2) RS went down mid-flight
- * during schema change process. We don't have to deal with RS going
- * down mid-flight during a schema change as the online regions from
- * the dead RS will get reassigned to some other RS and the
- * process of reassign inherently takes care of the schema change as well.
- * @param serverName
- */
- public void excludeRegionServerForSchemaChanges(String serverName) {
- try {
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Processing schema change exclusion for region server = " + serverName);
- List tables =
- ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
- if (tables == null || tables.isEmpty()) {
- String msg = "No schema change in progress. Skipping exclusion for " +
- "server = "+ serverName;
- LOG.debug(msg);
- status.setStatus(msg);
- return ;
- }
- for (String tableName : tables) {
- excludeRegionServer(tableName, serverName, status);
- }
- } catch(KeeperException ke) {
- LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke);
- } catch(IOException ioe) {
- LOG.error("IOException during excludeRegionServerForSchemaChanges", ioe);
-
- }
- }
-
- /**
- * Check whether a schema change is in progress for a given table on a
- * given RS.
- * @param tableName
- * @param serverName
- * @return TRUE is this RS is currently processing a schema change request
- * for the table.
- * @throws KeeperException
- */
- private boolean isSchemaChangeApplicableFor(String tableName,
- String serverName)
- throws KeeperException {
- List servers = ZKUtil.listChildrenAndWatchThem(watcher,
- getSchemaChangeNodePathForTable(tableName));
- return (servers.contains(serverName));
- }
-
- /**
- * Exclude a region server for a table (if applicable) from schema change processing.
- * @param tableName
- * @param serverName
- * @param status
- * @throws KeeperException
- * @throws IOException
- */
- private void excludeRegionServer(String tableName, String serverName,
- MonitoredTask status)
- throws KeeperException, IOException {
- if (isSchemaChangeApplicableFor(tableName, serverName)) {
- String msg = "Excluding RS " + serverName + " from schema change process" +
- " for table = " + tableName;
- LOG.debug(msg);
- status.setStatus(msg);
- SchemaChangeTracker.SchemaAlterStatus sas =
- getRSSchemaAlterStatus(tableName, serverName);
- if (sas == null) {
- LOG.debug("SchemaAlterStatus is NULL for table = " + tableName
- + " server = " + serverName);
- return;
- }
- // Set the status to IGNORED so we can process it accordingly.
- sas.setCurrentAlterStatus(
- SchemaChangeTracker.SchemaAlterStatus.AlterState.IGNORED);
- LOG.debug("Updating the current schema status to " + sas);
- String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
- serverName);
- ZKUtil.updateExistingNodeData(this.watcher,
- nodePath, Writables.getBytes(sas), getZKNodeVersion(nodePath));
- } else {
- LOG.debug("Skipping exclusion of RS " + serverName
- + " from schema change process"
- + " for table = " + tableName
- + " as it did not possess any online regions for the table");
- }
- processTableNode(tableName);
- }
-
- private int getZKNodeVersion(String nodePath) throws KeeperException {
- return ZKUtil.checkExists(this.watcher, nodePath);
- }
-
- /**
- * Create a new schema change ZK node.
- * @param tableName Table name that is getting altered
- * @throws KeeperException
- */
- public void createSchemaChangeNode(String tableName,
- int numberOfRegions)
- throws KeeperException, IOException {
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Creating schema change node for table = " + tableName);
- LOG.debug("Creating schema change node for table = "
- + tableName + " Path = "
- + getSchemaChangeNodePathForTable(tableName));
- if (doesSchemaChangeNodeExists(tableName)) {
- LOG.debug("Schema change node already exists for table = " + tableName
- + " Deleting the schema change node.");
- // If we already see a schema change node for this table we wait till the previous
- // alter process is complete. Ideally, we need not wait and we could simply delete
- // existing schema change node for this table and create new one. But then the
- // RS cloud will not be able to process concurrent schema updates for the same table
- // as they will be working with same set of online regions for this table. Meaning the
- // second alter change will not see any online regions (as they were being closed and
- // re opened by the first change) and will miss the second one.
- // We either handle this at the RS level using explicit locks while processing a table
- // or do it here. I prefer doing it here as it seems much simpler and cleaner.
- while(doesSchemaChangeNodeExists(tableName)) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode);
- // if number of online RS = 0, we should not do anything!
- if (rsCount <= 0) {
- String msg = "Master is not seeing any online region servers. Aborting the " +
- "schema change processing by region servers.";
- LOG.debug(msg);
- status.abort(msg);
- } else {
- LOG.debug("Master is seeing " + rsCount + " region servers online before " +
- "the schema change process.");
- MasterAlterStatus mas = new MasterAlterStatus(numberOfRegions,
- getActiveRegionServersAsString());
- LOG.debug("Master creating the master alter status = " + mas);
- ZKUtil.createSetData(this.watcher,
- getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas));
- status.markComplete("Created the ZK node for schema change. Current Alter Status = "
- + mas.toString());
- ZKUtil.listChildrenAndWatchThem(this.watcher,
- getSchemaChangeNodePathForTable(tableName));
- }
- }
-
- private String getActiveRegionServersAsString() {
- StringBuffer sbuf = new StringBuffer();
- List currentRS =
- masterServices.getRegionServerTracker().getOnlineServers();
- for (ServerName serverName : currentRS) {
- sbuf.append(serverName.getServerName());
- sbuf.append(" ");
- }
- LOG.debug("Current list of RS to process the schema change = "
- + sbuf.toString());
- return sbuf.toString();
- }
-
- /**
- * Create a new schema change ZK node.
- * @param tableName
- * @throws KeeperException
- */
- public boolean doesSchemaChangeNodeExists(String tableName)
- throws KeeperException {
- return ZKUtil.checkExists(watcher,
- getSchemaChangeNodePathForTable(tableName)) != -1;
- }
-
- /**
- * Check whether there are any schema change requests that are in progress now.
- * We simply assume that a schema change is in progress if we see a ZK schema node for
- * any table. We may revisit for fine grained checks such as check the current alter status
- * et al, but it is not required now.
- * @return
- */
- public boolean isSchemaChangeInProgress() {
- try {
- int schemaChangeCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.schemaZNode);
- return schemaChangeCount > 0;
- } catch (KeeperException ke) {
- LOG.debug("KeeperException while getting current schema change progress.");
- // What do we do now??? currently reporting as false.
- }
- return false;
- }
-
- /**
- * We get notified when a RS processes/or completed the schema change request.
- * The path will be of the format /hbase/schema/
- * @param path full path of the node whose children have changed
- */
- @Override
- public void nodeChildrenChanged(String path) {
- String tableName = null;
- if (path.startsWith(watcher.schemaZNode) &&
- !path.equals(watcher.schemaZNode)) {
- try {
- LOG.debug("NodeChildrenChanged Path = " + path);
- tableName = path.substring(path.lastIndexOf("/")+1, path.length());
- processTableNode(tableName);
- } catch (KeeperException e) {
- TaskMonitor.get().createStatus(
- "MasterSchemaChangeTracker: ZK exception while processing " +
- " nodeChildrenChanged() event for table = " + tableName
- + " Cause = " + e.getCause());
- LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
- + " schema change nodes", e);
- } catch(IOException ioe) {
- TaskMonitor.get().createStatus(
- "MasterSchemaChangeTracker: ZK exception while processing " +
- " nodeChildrenChanged() event for table = " + tableName
- + " Cause = " + ioe.getCause());
- LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
- + " schema change nodes", ioe);
- }
- }
- }
-
- /**
- * We get notified as and when the RS cloud updates their ZK nodes with
- * progress information. The path will be of the format
- * /hbase/schema//
- * @param path
- */
- @Override
- public void nodeDataChanged(String path) {
- String tableName = null;
- if (path.startsWith(watcher.schemaZNode) &&
- !path.equals(watcher.schemaZNode)) {
- try {
- LOG.debug("NodeDataChanged Path = " + path);
- String[] paths = path.split("/");
- tableName = paths[3];
- processTableNode(tableName);
- } catch (KeeperException e) {
- TaskMonitor.get().createStatus(
- "MasterSchemaChangeTracker: ZK exception while processing " +
- " nodeDataChanged() event for table = " + tableName
- + " Cause = " + e.getCause());
- LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
- + " schema change nodes", e);
- } catch(IOException ioe) {
- TaskMonitor.get().createStatus(
- "MasterSchemaChangeTracker: IO exception while processing " +
- " nodeDataChanged() event for table = " + tableName
- + " Cause = " + ioe.getCause());
- LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
- + " schema change nodes", ioe);
-
- }
- }
- }
-
- public String getSchemaChangeNodePathForTable(String tableName) {
- return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
- }
-
- /**
- * Used only for tests. Do not use this. See TestInstantSchemaChange for more details
- * on how this is getting used. This is primarily used to delay the schema complete
- * processing by master so that we can test some complex scenarios such as
- * master failover.
- * @param sleepTimeMillis
- */
- public void setSleepTimeMillis(int sleepTimeMillis) {
- this.sleepTimeMillis = sleepTimeMillis;
- }
-
- private String getSchemaChangeNodePathForTableAndServer(
- String tableName, String regionServerName) {
- return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
- regionServerName);
- }
-
-
- /**
- * Holds the current alter state for a table. Alter state includes the
- * current alter status (INPROCESS, FAILURE or SUCCESS (success is not getting
- * used now.), timestamp of alter request, number of hosts online at the time
- * of alter request, number of online regions to process for the schema change
- * request, number of processed regions and a list of region servers that
- * actually processed the schema change request.
- *
- * Master keeps track of schema change requests using the alter status and
- * periodically updates the alter status based on RS cloud processings.
- */
- public static class MasterAlterStatus implements Writable {
-
- public enum AlterState {
- INPROCESS, // Inprocess alter
- SUCCESS, // completed alter
- FAILURE // failure alter
- }
-
- private AlterState currentAlterStatus;
- // TimeStamp
- private long stamp;
- private int numberOfRegionsToProcess;
- private StringBuffer errorCause = new StringBuffer(" ");
- private StringBuffer processedHosts = new StringBuffer(" ");
- private String hostsToProcess;
- private int numberOfRegionsProcessed = 0;
-
- public MasterAlterStatus() {
-
- }
-
- public MasterAlterStatus(int numberOfRegions, String activeHosts) {
- this.numberOfRegionsToProcess = numberOfRegions;
- this.stamp = System.currentTimeMillis();
- this.currentAlterStatus = AlterState.INPROCESS;
- //this.rsToProcess = activeHosts;
- this.hostsToProcess = activeHosts;
- }
-
- public AlterState getCurrentAlterStatus() {
- return currentAlterStatus;
- }
-
- public void setCurrentAlterStatus(AlterState currentAlterStatus) {
- this.currentAlterStatus = currentAlterStatus;
- }
-
- public long getStamp() {
- return stamp;
- }
-
- public void setStamp(long stamp) {
- this.stamp = stamp;
- }
-
- public int getNumberOfRegionsToProcess() {
- return numberOfRegionsToProcess;
- }
-
- public void setNumberOfRegionsToProcess(int numberOfRegionsToProcess) {
- this.numberOfRegionsToProcess = numberOfRegionsToProcess;
- }
-
- public int getNumberOfRegionsProcessed() {
- return numberOfRegionsProcessed;
- }
-
- public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
- this.numberOfRegionsProcessed += numberOfRegionsProcessed;
- }
-
- public String getHostsToProcess() {
- return hostsToProcess;
- }
-
- public void setHostsToProcess(String hostsToProcess) {
- this.hostsToProcess = hostsToProcess;
- }
-
- public String getErrorCause() {
- return errorCause == null ? null : errorCause.toString();
- }
-
- public void setErrorCause(String errorCause) {
- if (errorCause == null || errorCause.trim().length() <= 0) {
- return;
- }
- if (this.errorCause == null) {
- this.errorCause = new StringBuffer(errorCause);
- } else {
- this.errorCause.append(errorCause);
- }
- }
-
- public String getProcessedHosts() {
- return processedHosts.toString();
- }
-
- public void setProcessedHosts(String processedHosts) {
- if (this.processedHosts == null) {
- this.processedHosts = new StringBuffer(processedHosts);
- } else {
- this.processedHosts.append(" ").append(processedHosts);
- }
- }
-
- /**
- * Ignore or exempt a RS from schema change processing.
- * Master will tweak the number of regions to process based on the
- * number of online regions on the target RS and also remove the
- * RS from list of hosts to process.
- * @param schemaAlterStatus
- */
- private void ignoreRSForSchemaChange(
- SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
- LOG.debug("Removing RS " + schemaAlterStatus.getHostName()
- + " from schema change process.");
- hostsToProcess =
- hostsToProcess.replaceAll(schemaAlterStatus.getHostName(), "");
- int ignoreRegionsCount = schemaAlterStatus.getNumberOfOnlineRegions();
- LOG.debug("Current number of regions processed = "
- + this.numberOfRegionsProcessed + " deducting ignored = "
- + ignoreRegionsCount
- + " final = " + (this.numberOfRegionsToProcess-ignoreRegionsCount));
- if (this.numberOfRegionsToProcess > 0) {
- this.numberOfRegionsToProcess -= ignoreRegionsCount;
- } else {
- LOG.debug("Number of regions to process is less than zero. This is odd");
- }
- }
-
- /**
- * Update the master alter status for this table based on RS alter status.
- * @param schemaAlterStatus
- */
- public void update(SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
- this.setProcessedHosts(schemaAlterStatus.getHostName());
- SchemaChangeTracker.SchemaAlterStatus.AlterState rsState =
- schemaAlterStatus.getCurrentAlterStatus();
- switch(rsState) {
- case FAILURE:
- LOG.debug("Schema update failure Status = "
- + schemaAlterStatus);
- this.setCurrentAlterStatus(
- MasterAlterStatus.AlterState.FAILURE);
- this.setNumberOfRegionsProcessed(
- schemaAlterStatus.getNumberOfRegionsProcessed());
- this.setErrorCause(schemaAlterStatus.getErrorCause());
- break;
- case SUCCESS:
- LOG.debug("Schema update SUCCESS Status = "
- + schemaAlterStatus);
- this.setNumberOfRegionsProcessed(
- schemaAlterStatus.getNumberOfRegionsProcessed());
- this.setCurrentAlterStatus(MasterAlterStatus.AlterState.SUCCESS);
- break;
- case IGNORED:
- LOG.debug("Schema update IGNORED Updating regions to " +
- "process count. Status = "+ schemaAlterStatus);
- ignoreRSForSchemaChange(schemaAlterStatus);
- break;
- default:
- break;
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- currentAlterStatus = AlterState.valueOf(in.readUTF());
- stamp = in.readLong();
- numberOfRegionsToProcess = in.readInt();
- hostsToProcess = Bytes.toString(Bytes.readByteArray(in));
- processedHosts = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
- errorCause = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(currentAlterStatus.name());
- out.writeLong(stamp);
- out.writeInt(numberOfRegionsToProcess);
- Bytes.writeByteArray(out, Bytes.toBytes(hostsToProcess));
- Bytes.writeByteArray(out, Bytes.toBytes(processedHosts.toString()));
- Bytes.writeByteArray(out, Bytes.toBytes(errorCause.toString()));
- }
-
- @Override
- public String toString() {
- return
- " state= " + currentAlterStatus
- + ", ts= " + stamp
- + ", number of regions to process = " + numberOfRegionsToProcess
- + ", number of regions processed = " + numberOfRegionsProcessed
- + ", hosts = " + hostsToProcess
- + " , processed hosts = " + processedHosts
- + " , errorCause = " + errorCause;
- }
- }
-}
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
index 6b2ea57..1cb693d 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java
@@ -64,7 +64,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
* @throws InterruptedException
*/
public ServerName getRootRegionLocation() throws InterruptedException {
- return dataToServerName(super.getData(true));
+ return ZKUtil.dataToServerName(super.getData(true));
}
/**
@@ -76,7 +76,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
*/
public static ServerName getRootRegionLocation(final ZooKeeperWatcher zkw)
throws KeeperException {
- return dataToServerName(ZKUtil.getData(zkw, zkw.rootServerZNode));
+ return ZKUtil.dataToServerName(ZKUtil.getData(zkw, zkw.rootServerZNode));
}
/**
@@ -97,7 +97,7 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
LOG.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
- return dataToServerName(super.blockUntilAvailable(timeout, true));
+ return ZKUtil.dataToServerName(super.blockUntilAvailable(timeout, true));
}
/**
@@ -164,43 +164,6 @@ public class RootRegionTracker extends ZooKeeperNodeTracker {
final long timeout)
throws InterruptedException {
byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.rootServerZNode, timeout);
- return dataToServerName(data);
- }
-
- /**
- * @param data
- * @return Returns null if data is null else converts passed data
- * to a ServerName instance.
- */
- static ServerName dataToServerName(final byte [] data) {
- if (data == null || data.length <= 0) return null;
- if (ProtobufUtil.isPBMagicPrefix(data)) {
- int prefixLen = ProtobufUtil.lengthOfPBMagic();
- try {
- RootRegionServer rss =
- RootRegionServer.newBuilder().mergeFrom(data, prefixLen, data.length - prefixLen).build();
- HBaseProtos.ServerName sn = rss.getServer();
- return new ServerName(sn.getHostName(), sn.getPort(), sn.getStartCode());
- } catch (InvalidProtocolBufferException e) {
- // A failed parse of the znode is pretty catastrophic. Rather than loop
- // retrying hoping the bad bytes will changes, and rather than change
- // the signature on this method to add an IOE which will send ripples all
- // over the code base, throw a RuntimeException. This should "never" happen.
- throw new RuntimeException(e);
- }
- }
- // The str returned could be old style -- pre hbase-1502 -- which was
- // hostname and port seperated by a colon rather than hostname, port and
- // startcode delimited by a ','.
- String str = Bytes.toString(data);
- int index = str.indexOf(ServerName.SERVERNAME_SEPARATOR);
- if (index != -1) {
- // Presume its ServerName.toString() format.
- return ServerName.parseServerName(str);
- }
- // Presume it a hostname:port format.
- String hostname = Addressing.parseHostname(str);
- int port = Addressing.parsePort(str);
- return new ServerName(hostname, port, -1L);
+ return ZKUtil.dataToServerName(data);
}
}
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java
deleted file mode 100644
index 48d4ff7..0000000
--- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.zookeeper;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.zookeeper.KeeperException;
-import org.apache.hadoop.hbase.util.Writables;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.*;
-import java.util.List;
-
-/**
- * Region server schema change tracker. RS uses this tracker to keep track of
- * alter schema requests from master and updates the status once the schema change
- * is complete.
- */
-@InterfaceAudience.Private
-public class SchemaChangeTracker extends ZooKeeperNodeTracker {
- public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class);
- private RegionServerServices regionServer = null;
- private volatile int sleepTimeMillis = 0;
-
-
- /**
- * Constructs a new ZK node tracker.
- *
- * After construction, use {@link #start} to kick off tracking.
- *
- * @param watcher
- * @param node
- * @param abortable
- */
- public SchemaChangeTracker(ZooKeeperWatcher watcher,
- Abortable abortable,
- RegionServerServices regionServer) {
- super(watcher, watcher.schemaZNode, abortable);
- this.regionServer = regionServer;
- }
-
- @Override
- public void start() {
- try {
- watcher.registerListener(this);
- ZKUtil.listChildrenAndWatchThem(watcher, node);
- // Clean-up old in-process schema changes for this RS now?
- } catch (KeeperException e) {
- LOG.error("RegionServer SchemaChangeTracker startup failed with " +
- "KeeperException.", e);
- }
- }
-
-
- /**
- * This event will be triggered whenever new schema change request is processed by the
- * master. The path will be of the format /hbase/schema/
- * @param path full path of the node whose children have changed
- */
- @Override
- public void nodeChildrenChanged(String path) {
- LOG.debug("NodeChildrenChanged. Path = " + path);
- if (path.equals(watcher.schemaZNode)) {
- try {
- List tables =
- ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode);
- LOG.debug("RS.SchemaChangeTracker: " +
- "Current list of tables with schema change = " + tables);
- if (tables != null) {
- handleSchemaChange(tables);
- } else {
- LOG.error("No tables found for schema change event." +
- " Skipping instant schema refresh");
- }
- } catch (KeeperException ke) {
- String errmsg = "KeeperException while handling nodeChildrenChanged for path = "
- + path + " Cause = " + ke.getCause();
- LOG.error(errmsg, ke);
- TaskMonitor.get().createStatus(errmsg);
- }
- }
- }
-
- private void handleSchemaChange(List tables) {
- for (String tableName : tables) {
- if (tableName != null) {
- LOG.debug("Processing schema change with status for table = " + tableName);
- handleSchemaChange(tableName);
- }
- }
- }
-
- private void handleSchemaChange(String tableName) {
- int refreshedRegionsCount = 0, onlineRegionsCount = 0;
- MonitoredTask status = null;
- try {
- List onlineRegions =
- regionServer.getOnlineRegions(Bytes.toBytes(tableName));
- if (onlineRegions != null && !onlineRegions.isEmpty()) {
- status = TaskMonitor.get().createStatus("Region server "
- + regionServer.getServerName().getServerName()
- + " handling schema change for table = " + tableName
- + " number of online regions = " + onlineRegions.size());
- onlineRegionsCount = onlineRegions.size();
- createStateNode(tableName, onlineRegions.size());
- for (HRegion hRegion : onlineRegions) {
- regionServer.refreshRegion(hRegion);
- refreshedRegionsCount ++;
- }
- SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
- alterStatus.update(SchemaAlterStatus.AlterState.SUCCESS, refreshedRegionsCount);
- updateSchemaChangeStatus(tableName, alterStatus);
- String msg = "Refresh schema completed for table name = " + tableName
- + " server = " + regionServer.getServerName().getServerName()
- + " online Regions = " + onlineRegions.size()
- + " refreshed Regions = " + refreshedRegionsCount;
- LOG.debug(msg);
- status.setStatus(msg);
- } else {
- LOG.debug("Server " + regionServer.getServerName().getServerName()
- + " has no online regions for table = " + tableName
- + " Ignoring the schema change request");
- }
- } catch (IOException ioe) {
- reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
- refreshedRegionsCount, ioe, status);
- } catch (KeeperException ke) {
- reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
- refreshedRegionsCount, ke, status);
- }
- }
-
- private int getZKNodeVersion(String nodePath) throws KeeperException {
- return ZKUtil.checkExists(this.watcher, nodePath);
- }
-
- private void reportAndLogSchemaRefreshError(String tableName,
- int onlineRegionsCount,
- int refreshedRegionsCount,
- Throwable exception,
- MonitoredTask status) {
- try {
- String errmsg =
- " Region Server " + regionServer.getServerName().getServerName()
- + " failed during schema change process. Cause = "
- + exception.getCause()
- + " Number of onlineRegions = " + onlineRegionsCount
- + " Processed regions = " + refreshedRegionsCount;
- SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
- alterStatus.update(SchemaAlterStatus.AlterState.FAILURE,
- refreshedRegionsCount, errmsg);
- String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
- regionServer.getServerName().getServerName());
- ZKUtil.updateExistingNodeData(this.watcher, nodePath,
- Writables.getBytes(alterStatus), getZKNodeVersion(nodePath));
- LOG.info("reportAndLogSchemaRefreshError() " +
- " Updated child ZKNode with SchemaAlterStatus = "
- + alterStatus + " for table = " + tableName);
- if (status == null) {
- status = TaskMonitor.get().createStatus(errmsg);
- } else {
- status.setStatus(errmsg);
- }
- } catch (KeeperException e) {
- // Retry ?
- String errmsg = "KeeperException while updating the schema change node with "
- + "error status for table = "
- + tableName + " server = "
- + regionServer.getServerName().getServerName()
- + " Cause = " + e.getCause();
- LOG.error(errmsg, e);
- TaskMonitor.get().createStatus(errmsg);
- } catch(IOException ioe) {
- // retry ??
- String errmsg = "IOException while updating the schema change node with "
- + "server name for table = "
- + tableName + " server = "
- + regionServer.getServerName().getServerName()
- + " Cause = " + ioe.getCause();
- TaskMonitor.get().createStatus(errmsg);
- LOG.error(errmsg, ioe);
- }
- }
-
-
- private void createStateNode(String tableName, int numberOfOnlineRegions)
- throws IOException {
- SchemaAlterStatus sas =
- new SchemaAlterStatus(regionServer.getServerName().getServerName(),
- numberOfOnlineRegions);
- LOG.debug("Creating Schema Alter State node = " + sas);
- try {
- ZKUtil.createSetData(this.watcher,
- getSchemaChangeNodePathForTableAndServer(tableName,
- regionServer.getServerName().getServerName()),
- Writables.getBytes(sas));
- } catch (KeeperException ke) {
- String errmsg = "KeeperException while creating the schema change node with "
- + "server name for table = "
- + tableName + " server = "
- + regionServer.getServerName().getServerName()
- + " Message = " + ke.getCause();
- LOG.error(errmsg, ke);
- TaskMonitor.get().createStatus(errmsg);
- }
-
- }
-
- private SchemaAlterStatus getSchemaAlterStatus(String tableName)
- throws KeeperException, IOException {
- byte[] statusBytes = ZKUtil.getData(this.watcher,
- getSchemaChangeNodePathForTableAndServer(tableName,
- regionServer.getServerName().getServerName()));
- if (statusBytes == null || statusBytes.length <= 0) {
- return null;
- }
- SchemaAlterStatus sas = new SchemaAlterStatus();
- Writables.getWritable(statusBytes, sas);
- return sas;
- }
-
- private void updateSchemaChangeStatus(String tableName,
- SchemaAlterStatus schemaAlterStatus)
- throws KeeperException, IOException {
- try {
- if(sleepTimeMillis > 0) {
- try {
- LOG.debug("SchemaChangeTracker sleeping for "
- + sleepTimeMillis);
- Thread.sleep(sleepTimeMillis);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- ZKUtil.updateExistingNodeData(this.watcher,
- getSchemaChangeNodePathForTableAndServer(tableName,
- regionServer.getServerName().getServerName()),
- Writables.getBytes(schemaAlterStatus), -1);
- String msg = "Schema change tracker completed for table = " + tableName
- + " status = " + schemaAlterStatus;
- LOG.debug(msg);
- TaskMonitor.get().createStatus(msg);
- } catch (KeeperException.NoNodeException e) {
- String errmsg = "KeeperException.NoNodeException while updating the schema "
- + "change node with server name for table = "
- + tableName + " server = "
- + regionServer.getServerName().getServerName()
- + " Cause = " + e.getCause();
- TaskMonitor.get().createStatus(errmsg);
- LOG.error(errmsg, e);
- } catch (KeeperException e) {
- // Retry ?
- String errmsg = "KeeperException while updating the schema change node with "
- + "server name for table = "
- + tableName + " server = "
- + regionServer.getServerName().getServerName()
- + " Cause = " + e.getCause();
- LOG.error(errmsg, e);
- TaskMonitor.get().createStatus(errmsg);
- } catch(IOException ioe) {
- String errmsg = "IOException while updating the schema change node with "
- + "server name for table = "
- + tableName + " server = "
- + regionServer.getServerName().getServerName()
- + " Cause = " + ioe.getCause();
- LOG.error(errmsg, ioe);
- TaskMonitor.get().createStatus(errmsg);
- }
- }
-
- private String getSchemaChangeNodePathForTable(String tableName) {
- return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
- }
-
- private String getSchemaChangeNodePathForTableAndServer(
- String tableName, String regionServerName) {
- return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
- regionServerName);
- }
-
- public int getSleepTimeMillis() {
- return sleepTimeMillis;
- }
-
- /**
- * Set a sleep time in millis before this RS can update it's progress status.
- * Used only for test cases to test complex test scenarios such as RS failures and
- * RS exemption handling.
- * @param sleepTimeMillis
- */
- public void setSleepTimeMillis(int sleepTimeMillis) {
- this.sleepTimeMillis = sleepTimeMillis;
- }
-
- /**
- * Check whether there are any schema change requests that are in progress now for the given table.
- * We simply assume that a schema change is in progress if we see a ZK schema node this
- * any table. We may revisit for fine grained checks such as check the current alter status
- * et al, but it is not required now.
- * @return
- */
- public boolean isSchemaChangeInProgress(String tableName) {
- try {
- List schemaChanges = ZKUtil.listChildrenAndWatchThem(this.watcher,
- watcher.schemaZNode);
- if (schemaChanges != null) {
- for (String alterTableName : schemaChanges) {
- if (alterTableName.equals(tableName)) {
- return true;
- }
- }
- return false;
- }
- } catch (KeeperException ke) {
- LOG.debug("isSchemaChangeInProgress. " +
- "KeeperException while getting current schema change progress.");
- return false;
- }
- return false;
- }
-
- /**
- * Holds the current alter state for a table. Alter state includes the
- * current alter status (INPROCESS, FAILURE, SUCCESS, or IGNORED, current RS
- * host name, timestamp of alter request, number of online regions this RS has for
- * the given table, number of processed regions and an errorCause in case
- * if the RS failed during the schema change process.
- *
- * RS keeps track of schema change requests per table using the alter status and
- * periodically updates the alter status based on schema change status.
- */
- public static class SchemaAlterStatus implements Writable {
-
- public enum AlterState {
- INPROCESS, // Inprocess alter
- SUCCESS, // completed alter
- FAILURE, // failure alter
- IGNORED // Ignore the alter processing.
- }
-
- private AlterState currentAlterStatus;
- // TimeStamp
- private long stamp;
- private int numberOfOnlineRegions;
- private String errorCause = " ";
- private String hostName;
- private int numberOfRegionsProcessed = 0;
-
- public SchemaAlterStatus() {
-
- }
-
- public SchemaAlterStatus(String hostName, int numberOfOnlineRegions) {
- this.numberOfOnlineRegions = numberOfOnlineRegions;
- this.stamp = System.currentTimeMillis();
- this.currentAlterStatus = AlterState.INPROCESS;
- //this.rsToProcess = activeHosts;
- this.hostName = hostName;
- }
-
- public AlterState getCurrentAlterStatus() {
- return currentAlterStatus;
- }
-
- public void setCurrentAlterStatus(AlterState currentAlterStatus) {
- this.currentAlterStatus = currentAlterStatus;
- }
-
- public int getNumberOfOnlineRegions() {
- return numberOfOnlineRegions;
- }
-
- public void setNumberOfOnlineRegions(int numberOfRegions) {
- this.numberOfOnlineRegions = numberOfRegions;
- }
-
- public int getNumberOfRegionsProcessed() {
- return numberOfRegionsProcessed;
- }
-
- public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
- this.numberOfRegionsProcessed = numberOfRegionsProcessed;
- }
-
- public String getErrorCause() {
- return errorCause;
- }
-
- public void setErrorCause(String errorCause) {
- this.errorCause = errorCause;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public void update(AlterState state, int numberOfRegions, String errorCause) {
- this.currentAlterStatus = state;
- this.numberOfRegionsProcessed = numberOfRegions;
- this.errorCause = errorCause;
- }
-
- public void update(AlterState state, int numberOfRegions) {
- this.currentAlterStatus = state;
- this.numberOfRegionsProcessed = numberOfRegions;
- }
-
- public void update(AlterState state) {
- this.currentAlterStatus = state;
- }
-
- public void update(SchemaAlterStatus status) {
- this.currentAlterStatus = status.getCurrentAlterStatus();
- this.numberOfRegionsProcessed = status.getNumberOfRegionsProcessed();
- this.errorCause = status.getErrorCause();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- currentAlterStatus = AlterState.valueOf(in.readUTF());
- stamp = in.readLong();
- numberOfOnlineRegions = in.readInt();
- hostName = Bytes.toString(Bytes.readByteArray(in));
- numberOfRegionsProcessed = in.readInt();
- errorCause = Bytes.toString(Bytes.readByteArray(in));
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(currentAlterStatus.name());
- out.writeLong(stamp);
- out.writeInt(numberOfOnlineRegions);
- Bytes.writeByteArray(out, Bytes.toBytes(hostName));
- out.writeInt(numberOfRegionsProcessed);
- Bytes.writeByteArray(out, Bytes.toBytes(errorCause));
- }
-
- @Override
- public String toString() {
- return
- " state= " + currentAlterStatus
- + ", ts= " + stamp
- + ", number of online regions = " + numberOfOnlineRegions
- + ", host= " + hostName + " processed regions = " + numberOfRegionsProcessed
- + ", errorCause = " + errorCause;
- }
- }
-
-}
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 037e354..e8c99f7 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -37,6 +37,10 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RootRegionServer;
+import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.AsyncCallback;
@@ -49,6 +53,8 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* Internal HBase utility class for ZooKeeper.
*
@@ -719,7 +725,7 @@ public class ZKUtil {
if (isSecureZooKeeper(zkw.getConfiguration())) {
// Certain znodes must be readable by non-authenticated clients
if ((node.equals(zkw.rootServerZNode) == true) ||
- (node.equals(zkw.masterAddressZNode) == true) ||
+ (node.equals(zkw.getMasterAddressZNode()) == true) ||
(node.equals(zkw.clusterIdZNode) == true)) {
return ZooKeeperWatcher.CREATOR_ALL_AND_WORLD_READABLE;
}
@@ -1036,8 +1042,12 @@ public class ZKUtil {
StringBuilder sb = new StringBuilder();
try {
sb.append("HBase is rooted at ").append(zkw.baseZNode);
- sb.append("\nActive master address: ").append(
- ServerName.parseVersionedServerName(getData(zkw, zkw.masterAddressZNode)));
+ sb.append("\nActive master address: ");
+ try {
+ sb.append(MasterAddressTracker.getMasterAddress(zkw));
+ } catch (IOException e) {
+ sb.append("<>");
+ }
sb.append("\nBackup master addresses:");
for (String child : listChildrenNoWatch(zkw,
zkw.backupMasterAddressesZNode)) {
@@ -1199,4 +1209,47 @@ public class ZKUtil {
return data;
}
-}
+
+
+ /**
+ * Get a ServerName from the passed in znode data bytes.
+ * @param data ZNode data with a server name in it; can handle the old style
+ * servername where servername was host and port. Works too with data that
+ * begins w/ the pb 'PBUF' magic and that its then followed by a protobuf that
+ * has a serialized {@link ServerName} in it.
+ * @return Returns null if data is null else converts passed data
+ * to a ServerName instance.
+ */
+ static ServerName dataToServerName(final byte [] data) {
+ if (data == null || data.length <= 0) return null;
+ if (ProtobufUtil.isPBMagicPrefix(data)) {
+ int prefixLen = ProtobufUtil.lengthOfPBMagic();
+ try {
+ RootRegionServer rss =
+ RootRegionServer.newBuilder().mergeFrom(data, prefixLen, data.length - prefixLen).build();
+ HBaseProtos.ServerName sn = rss.getServer();
+ return new ServerName(sn.getHostName(), sn.getPort(), sn.getStartCode());
+ } catch (InvalidProtocolBufferException e) {
+ // A failed parse of the znode is pretty catastrophic. Rather than loop
+ // retrying hoping the bad bytes will changes, and rather than change
+ // the signature on this method to add an IOE which will send ripples all
+ // over the code base, throw a RuntimeException. This should "never" happen.
+ // Fail fast if it does.
+ throw new RuntimeException(e);
+ }
+ }
+ // The str returned could be old style -- pre hbase-1502 -- which was
+ // hostname and port seperated by a colon rather than hostname, port and
+ // startcode delimited by a ','.
+ String str = Bytes.toString(data);
+ int index = str.indexOf(ServerName.SERVERNAME_SEPARATOR);
+ if (index != -1) {
+ // Presume its ServerName serialized with versioned bytes.
+ return ServerName.parseVersionedServerName(data);
+ }
+ // Presume it a hostname:port format.
+ String hostname = Addressing.parseHostname(str);
+ int port = Addressing.parsePort(str);
+ return new ServerName(hostname, port, -1L);
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 0f83655..4fc105f 100644
--- src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -89,7 +89,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// znode containing ephemeral nodes of the draining regionservers
public String drainingZNode;
// znode of currently active master
- public String masterAddressZNode;
+ private String masterAddressZNode;
// znode of this master in backup master directory, if not the active master
public String backupMasterAddressesZNode;
// znode containing the current cluster state
@@ -102,8 +102,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String clusterIdZNode;
// znode used for log splitting work assignment
public String splitLogZNode;
- // znode used to record table schema changes
- public String schemaZNode;
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE =
@@ -166,7 +164,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
ZKUtil.createAndFailSilent(this, drainingZNode);
ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode);
- ZKUtil.createAndFailSilent(this, schemaZNode);
ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException(
@@ -215,8 +212,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
conf.get("zookeeper.znode.clusterId", "hbaseid"));
splitLogZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
- schemaZNode = ZKUtil.joinZNode(baseZNode,
- conf.get("zookeeper.znode.schema", "schema"));
}
/**
@@ -460,4 +455,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public boolean isAborted() {
return this.abortable.isAborted();
}
+
+ /**
+ * @return Path to the currently active master.
+ */
+ public String getMasterAddressZNode() {
+ return this.masterAddressZNode;
+ }
}
diff --git src/main/protobuf/ZooKeeper.proto src/main/protobuf/ZooKeeper.proto
index 20f8eb0..dc85be9 100644
--- src/main/protobuf/ZooKeeper.proto
+++ src/main/protobuf/ZooKeeper.proto
@@ -34,3 +34,11 @@ message RootRegionServer {
// The ServerName hosting the root region currently.
required ServerName server = 1;
}
+
+/**
+ * Content of the master znode.
+ */
+message Master {
+ // The ServerName of the current Master
+ required ServerName master = 1;
+}
diff --git src/main/resources/hbase-default.xml src/main/resources/hbase-default.xml
index 341431a..44ee689 100644
--- src/main/resources/hbase-default.xml
+++ src/main/resources/hbase-default.xml
@@ -780,39 +780,15 @@
- hbase.coprocessor.abortonerror
- false
-
- Set to true to cause the hosting server (master or regionserver) to
- abort if a coprocessor throws a Throwable object that is not IOException or
- a subclass of IOException. Setting it to true might be useful in development
- environments where one wants to terminate the server as soon as possible to
- simplify coprocessor failure analysis.
-
-
-
- hbase.instant.schema.alter.enabled
- false
- Whether or not to handle alter schema changes instantly or not.
- If enabled, all schema change alter operations will be instant, as the master will not
- explicitly unassign/assign the impacted regions and instead will rely on Region servers to
- refresh their schema changes. If enabled, the schema alter requests will survive
- master or RS failures.
-
-
-
- hbase.instant.schema.janitor.period
- 120000
- The Schema Janitor process wakes up every millis and sweeps all
- expired/failed schema change requests.
-
-
-
- hbase.instant.schema.alter.timeout
- 60000
- Timeout in millis after which any pending schema alter request will be
- considered as failed.
-
+ hbase.coprocessor.abortonerror
+ false
+
+ Set to true to cause the hosting server (master or regionserver) to
+ abort if a coprocessor throws a Throwable object that is not IOException or
+ a subclass of IOException. Setting it to true might be useful in development
+ environments where one wants to terminate the server as soon as possible to
+ simplify coprocessor failure analysis.
+
hbase.online.schema.update.enable
diff --git src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java
index 378c2b4..e69de29 100644
--- src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java
+++ src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.experimental.categories.Category;
-
-@Category(LargeTests.class)
-public class InstantSchemaChangeTestBase {
-
- final Log LOG = LogFactory.getLog(getClass());
- protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- protected HBaseAdmin admin;
- protected static MiniHBaseCluster miniHBaseCluster = null;
- protected Configuration conf;
- protected static MasterSchemaChangeTracker msct = null;
-
- protected final byte [] row = Bytes.toBytes("row");
- protected final byte [] qualifier = Bytes.toBytes("qualifier");
- final byte [] value = Bytes.toBytes("value");
-
- @Before
- public void setUpBeforeClass() throws Exception {
- TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
- TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
- TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
- TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
- TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
- TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
- TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
- //
- miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
- msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
-
- }
-
- @After
- public void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- /**
- * Find the RS that is currently holding our online region.
- * @param tableName
- * @return
- */
- protected HRegionServer findRSWithOnlineRegionFor(String tableName) {
- List rsThreads =
- miniHBaseCluster.getLiveRegionServerThreads();
- for (JVMClusterUtil.RegionServerThread rsT : rsThreads) {
- HRegionServer rs = rsT.getRegionServer();
- List regions = rs.getOnlineRegions(Bytes.toBytes(tableName));
- if (regions != null && !regions.isEmpty()) {
- return rs;
- }
- }
- return null;
- }
-
- protected void waitForSchemaChangeProcess(final String tableName)
- throws KeeperException, InterruptedException {
- waitForSchemaChangeProcess(tableName, 10000);
- }
-
- /**
- * This a pretty low cost signalling mechanism. It is quite possible that we will
- * miss out the ZK node creation signal as in some cases the schema change process
- * happens rather quickly and our thread waiting for ZK node creation might wait forver.
- * The fool-proof strategy would be to directly listen for ZK events.
- * @param tableName
- * @throws KeeperException
- * @throws InterruptedException
- */
- protected void waitForSchemaChangeProcess(final String tableName, final long waitTimeMills)
- throws KeeperException, InterruptedException {
- LOG.info("Waiting for ZK node creation for table = " + tableName);
- final MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- final Runnable r = new Runnable() {
- public void run() {
- try {
- while(!msct.doesSchemaChangeNodeExists(tableName)) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (KeeperException ke) {
- ke.printStackTrace();
- }
- LOG.info("Waiting for ZK node deletion for table = " + tableName);
- try {
- while(msct.doesSchemaChangeNodeExists(tableName)) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (KeeperException ke) {
- ke.printStackTrace();
- }
- }
- };
- Thread t = new Thread(r);
- t.start();
- if (waitTimeMills > 0) {
- t.join(waitTimeMills);
- } else {
- t.join(10000);
- }
- }
-
- protected HTable createTableAndValidate(String tableName) throws IOException {
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start createTableAndValidate()");
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- HTableDescriptor[] tables = admin.listTables();
- int numTables = 0;
- if (tables != null) {
- numTables = tables.length;
- }
- HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
- HConstants.CATALOG_FAMILY);
- tables = this.admin.listTables();
- assertEquals(numTables + 1, tables.length);
- LOG.info("created table = " + tableName);
- return ht;
- }
-
-}
\ No newline at end of file
diff --git src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java
deleted file mode 100644
index 4ac2847..0000000
--- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(LargeTests.class)
-public class TestInstantSchemaChange extends InstantSchemaChangeTestBase {
-
- final Log LOG = LogFactory.getLog(getClass());
-
- @Test
- public void testInstantSchemaChangeForModifyTable() throws IOException,
- KeeperException, InterruptedException {
-
- String tableName = "testInstantSchemaChangeForModifyTable";
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start testInstantSchemaChangeForModifyTable()");
- HTable ht = createTableAndValidate(tableName);
-
- String newFamily = "newFamily";
- HTableDescriptor htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor(newFamily));
-
- admin.modifyTable(Bytes.toBytes(tableName), htd);
- waitForSchemaChangeProcess(tableName);
- assertFalse(msct.doesSchemaChangeNodeExists(tableName));
-
- Put put1 = new Put(row);
- put1.add(Bytes.toBytes(newFamily), qualifier, value);
- ht.put(put1);
-
- Get get1 = new Get(row);
- get1.addColumn(Bytes.toBytes(newFamily), qualifier);
- Result r = ht.get(get1);
- byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
- int result = Bytes.compareTo(value, tvalue);
- assertEquals(result, 0);
- LOG.info("END testInstantSchemaChangeForModifyTable()");
- ht.close();
- }
-
- @Test
- public void testInstantSchemaChangeForAddColumn() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("Start testInstantSchemaChangeForAddColumn() ");
- String tableName = "testSchemachangeForAddColumn";
- HTable ht = createTableAndValidate(tableName);
- String newFamily = "newFamily";
- HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
-
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- waitForSchemaChangeProcess(tableName);
- assertFalse(msct.doesSchemaChangeNodeExists(tableName));
-
- Put put1 = new Put(row);
- put1.add(Bytes.toBytes(newFamily), qualifier, value);
- LOG.info("******** Put into new column family ");
- ht.put(put1);
-
- Get get1 = new Get(row);
- get1.addColumn(Bytes.toBytes(newFamily), qualifier);
- Result r = ht.get(get1);
- byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
- LOG.info(" Value put = " + value + " value from table = " + tvalue);
- int result = Bytes.compareTo(value, tvalue);
- assertEquals(result, 0);
- LOG.info("End testInstantSchemaChangeForAddColumn() ");
- ht.close();
- }
-
- @Test
- public void testInstantSchemaChangeForModifyColumn() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("Start testInstantSchemaChangeForModifyColumn() ");
- String tableName = "testSchemachangeForModifyColumn";
- createTableAndValidate(tableName);
-
- HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
- hcd.setMaxVersions(99);
- hcd.setBlockCacheEnabled(false);
-
- admin.modifyColumn(Bytes.toBytes(tableName), hcd);
- waitForSchemaChangeProcess(tableName);
- assertFalse(msct.doesSchemaChangeNodeExists(tableName));
-
- List onlineRegions
- = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn"));
- for (HRegion onlineRegion : onlineRegions) {
- HTableDescriptor htd = onlineRegion.getTableDesc();
- HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
- assertTrue(tableHcd.isBlockCacheEnabled() == false);
- assertEquals(tableHcd.getMaxVersions(), 99);
- }
- LOG.info("End testInstantSchemaChangeForModifyColumn() ");
-
- }
-
- @Test
- public void testInstantSchemaChangeForDeleteColumn() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("Start testInstantSchemaChangeForDeleteColumn() ");
- String tableName = "testSchemachangeForDeleteColumn";
- int numTables = 0;
- HTableDescriptor[] tables = admin.listTables();
- if (tables != null) {
- numTables = tables.length;
- }
-
- byte[][] FAMILIES = new byte[][] {
- Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") };
-
- HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
- FAMILIES);
- tables = this.admin.listTables();
- assertEquals(numTables + 1, tables.length);
- LOG.info("Table testSchemachangeForDeleteColumn created");
-
- admin.deleteColumn(tableName, "C");
-
- waitForSchemaChangeProcess(tableName);
- assertFalse(msct.doesSchemaChangeNodeExists(tableName));
- HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName));
- HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C"));
- assertTrue(hcd == null);
- LOG.info("End testInstantSchemaChangeForDeleteColumn() ");
- ht.close();
- }
-
- @Test
- public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException,
- KeeperException {
- final String tableName = "testInstantSchemaChangeWhenTableIsDisabled";
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()");
- HTable ht = createTableAndValidate(tableName);
- // Disable table
- admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled");
- // perform schema changes
- HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false);
- ht.close();
- }
-
- /**
- * Test that when concurrent alter requests are received for a table we don't miss any.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException,
- KeeperException, InterruptedException {
- final String tableName = "testConcurrentInstantSchemaChangeForModifyTable";
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()");
- HTable ht = createTableAndValidate(tableName);
-
- Runnable run1 = new Runnable() {
- public void run() {
- HColumnDescriptor hcd = new HColumnDescriptor("family1");
- try {
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- } catch (IOException ioe) {
- ioe.printStackTrace();
-
- }
- }
- };
- Runnable run2 = new Runnable() {
- public void run() {
- HColumnDescriptor hcd = new HColumnDescriptor("family2");
- try {
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- } catch (IOException ioe) {
- ioe.printStackTrace();
-
- }
- }
- };
-
- run1.run();
- // We have to add a sleep here as in concurrent scenarios the HTD update
- // in HDFS fails and returns with null HTD. This needs to be investigated,
- // but it doesn't impact the instant alter functionality in any way.
- Thread.sleep(100);
- run2.run();
-
- waitForSchemaChangeProcess(tableName);
-
- Put put1 = new Put(row);
- put1.add(Bytes.toBytes("family1"), qualifier, value);
- ht.put(put1);
-
- Get get1 = new Get(row);
- get1.addColumn(Bytes.toBytes("family1"), qualifier);
- Result r = ht.get(get1);
- byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier);
- int result = Bytes.compareTo(value, tvalue);
- assertEquals(result, 0);
- Thread.sleep(10000);
-
- Put put2 = new Put(row);
- put2.add(Bytes.toBytes("family2"), qualifier, value);
- ht.put(put2);
-
- Get get2 = new Get(row);
- get2.addColumn(Bytes.toBytes("family2"), qualifier);
- Result r2 = ht.get(get2);
- byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
- int result2 = Bytes.compareTo(value, tvalue2);
- assertEquals(result2, 0);
- LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()");
- ht.close();
- }
-
- /**
- * The schema change request blocks while a LB run is in progress. This
- * test validates this behavior.
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- @Test
- public void testConcurrentInstantSchemaChangeAndLoadBalancerRun() throws IOException,
- InterruptedException, KeeperException {
- final String tableName = "testInstantSchemaChangeWithLoadBalancerRunning";
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start testInstantSchemaChangeWithLoadBalancerRunning()");
- final String newFamily = "newFamily";
- HTable ht = createTableAndValidate(tableName);
- final MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
-
-
- Runnable balancer = new Runnable() {
- public void run() {
- // run the balancer now.
- miniHBaseCluster.getMaster().balance();
- }
- };
-
- Runnable schemaChanger = new Runnable() {
- public void run() {
- HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
- try {
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- } catch (IOException ioe) {
- ioe.printStackTrace();
-
- }
- }
- };
-
- balancer.run();
- schemaChanger.run();
- waitForSchemaChangeProcess(tableName, 40000);
- assertFalse(msct.doesSchemaChangeNodeExists(tableName));
-
- Put put1 = new Put(row);
- put1.add(Bytes.toBytes(newFamily), qualifier, value);
- LOG.info("******** Put into new column family ");
- ht.put(put1);
- ht.flushCommits();
-
- LOG.info("******** Get from new column family ");
- Get get1 = new Get(row);
- get1.addColumn(Bytes.toBytes(newFamily), qualifier);
- Result r = ht.get(get1);
- byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
- LOG.info(" Value put = " + value + " value from table = " + tvalue);
- int result = Bytes.compareTo(value, tvalue);
- assertEquals(result, 0);
-
- LOG.info("End testInstantSchemaChangeWithLoadBalancerRunning() ");
- ht.close();
- }
-
-
- /**
- * This test validates two things. One is that the LoadBalancer does not run when a schema
- * change process is in progress. The second thing is that it also checks that failed/expired
- * schema changes are expired to unblock the load balancer run.
- *
- */
- @Test (timeout=70000)
- public void testLoadBalancerBlocksDuringSchemaChangeRequests() throws KeeperException,
- IOException, InterruptedException {
- LOG.info("Start testConcurrentLoadBalancerSchemaChangeRequests() ");
- final MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- // Test that the load balancer does not run while an in-flight schema
- // change operation is in progress.
- // Simulate a new schema change request.
- msct.createSchemaChangeNode("testLoadBalancerBlocks", 0);
- // The schema change node is created.
- assertTrue(msct.doesSchemaChangeNodeExists("testLoadBalancerBlocks"));
- // Now, request an explicit LB run.
-
- Runnable balancer1 = new Runnable() {
- public void run() {
- // run the balancer now.
- miniHBaseCluster.getMaster().balance();
- }
- };
- balancer1.run();
-
- // Load balancer should not run now.
- assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
- LOG.debug("testConcurrentLoadBalancerSchemaChangeRequests Asserted");
- LOG.info("End testConcurrentLoadBalancerSchemaChangeRequests() ");
- }
-
- /**
- * Test that instant schema change blocks while LB is running.
- * @throws KeeperException
- * @throws IOException
- * @throws InterruptedException
- */
- @Test (timeout=10000)
- public void testInstantSchemaChangeBlocksDuringLoadBalancerRun() throws KeeperException,
- IOException, InterruptedException {
- final MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
-
- final String tableName = "testInstantSchemaChangeBlocksDuringLoadBalancerRun";
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start testInstantSchemaChangeBlocksDuringLoadBalancerRun()");
- final String newFamily = "newFamily";
- createTableAndValidate(tableName);
-
- // Test that the schema change request does not run while an in-flight LB run
- // is in progress.
- // First, request an explicit LB run.
-
- Runnable balancer1 = new Runnable() {
- public void run() {
- // run the balancer now.
- miniHBaseCluster.getMaster().balance();
- }
- };
-
- Runnable schemaChanger = new Runnable() {
- public void run() {
- HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
- try {
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- } catch (IOException ioe) {
- ioe.printStackTrace();
-
- }
- }
- };
-
- Thread t1 = new Thread(balancer1);
- Thread t2 = new Thread(schemaChanger);
- t1.start();
- t2.start();
-
- // check that they both happen concurrently
- Runnable balancerCheck = new Runnable() {
- public void run() {
- // check whether balancer is running.
- while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- try {
- assertFalse(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
- } catch (KeeperException ke) {
- ke.printStackTrace();
- }
- LOG.debug("Load Balancer is now running or skipped");
- while(miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
- try {
- assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
- } catch (KeeperException ke) {
-
- }
-
- }
- };
-
- Thread t = new Thread(balancerCheck);
- t.start();
- t.join(1000);
- // Load balancer should not run now.
- //assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
- // Schema change request node should now exist.
- // assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
- LOG.debug("testInstantSchemaChangeBlocksDuringLoadBalancerRun Asserted");
- LOG.info("End testInstantSchemaChangeBlocksDuringLoadBalancerRun() ");
- }
-
- /**
- * To test the schema janitor (that it cleans expired/failed schema alter attempts) we
- * simply create a fake table (that doesn't exist, with fake number of online regions) in ZK.
- * This schema alter request will time out (after 30 seconds) and our janitor will clean it up.
- * regions
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void testInstantSchemaJanitor() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("testInstantSchemaWithFailedExpiredOperations() ");
- String fakeTableName = "testInstantSchemaWithFailedExpiredOperations";
- MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- msct.createSchemaChangeNode(fakeTableName, 10);
- LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
- + " created");
- Thread.sleep(40000);
- assertFalse(msct.doesSchemaChangeNodeExists(fakeTableName));
- LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
- + " deleted");
- LOG.info("END testInstantSchemaWithFailedExpiredOperations() ");
- }
-
-
-
- @org.junit.Rule
- public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
- new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
-}
diff --git src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java
deleted file mode 100644
index c1490eb..0000000
--- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(LargeTests.class)
-public class TestInstantSchemaChangeFailover {
-
- final Log LOG = LogFactory.getLog(getClass());
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private HBaseAdmin admin;
- private static MiniHBaseCluster miniHBaseCluster = null;
- private Configuration conf;
- private ZooKeeperWatcher zkw;
- private static MasterSchemaChangeTracker msct = null;
-
- private final byte [] row = Bytes.toBytes("row");
- private final byte [] qualifier = Bytes.toBytes("qualifier");
- final byte [] value = Bytes.toBytes("value");
-
- @Before
- public void setUpBeforeClass() throws Exception {
- TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
- TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
- TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
- TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
- TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
- TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
- TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
- //
- miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
- msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
- }
-
- @After
- public void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- /**
- * This a pretty low cost signalling mechanism. It is quite possible that we will
- * miss out the ZK node creation signal as in some cases the schema change process
- * happens rather quickly and our thread waiting for ZK node creation might wait forver.
- * The fool-proof strategy would be to directly listen for ZK events.
- * @param tableName
- * @throws KeeperException
- * @throws InterruptedException
- */
- private void waitForSchemaChangeProcess(final String tableName)
- throws KeeperException, InterruptedException {
- LOG.info("Waiting for ZK node creation for table = " + tableName);
- final MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- final Runnable r = new Runnable() {
- public void run() {
- try {
- while(!msct.doesSchemaChangeNodeExists(tableName)) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (KeeperException ke) {
- ke.printStackTrace();
- }
-
- LOG.info("Waiting for ZK node deletion for table = " + tableName);
- try {
- while(msct.doesSchemaChangeNodeExists(tableName)) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- } catch (KeeperException ke) {
- ke.printStackTrace();
- }
- }
- };
- Thread t = new Thread(r);
- t.start();
- t.join(10000);
- }
-
-
- /**
- * Kill a random RS and see that the schema change can succeed.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test (timeout=50000)
- public void testInstantSchemaChangeWhileRSCrash() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("Start testInstantSchemaChangeWhileRSCrash()");
- zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher();
-
- final String tableName = "TestRSCrashDuringSchemaChange";
- HTable ht = createTableAndValidate(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor("family2");
- admin.addColumn(Bytes.toBytes(tableName), hcd);
-
- miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change");
- // Let the dust settle down
- Thread.sleep(10000);
- waitForSchemaChangeProcess(tableName);
- Put put2 = new Put(row);
- put2.add(Bytes.toBytes("family2"), qualifier, value);
- ht.put(put2);
-
- Get get2 = new Get(row);
- get2.addColumn(Bytes.toBytes("family2"), qualifier);
- Result r2 = ht.get(get2);
- byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
- int result2 = Bytes.compareTo(value, tvalue2);
- assertEquals(result2, 0);
- String nodePath = msct.getSchemaChangeNodePathForTable("TestRSCrashDuringSchemaChange");
- assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1);
- LOG.info("result2 = " + result2);
- LOG.info("end testInstantSchemaChangeWhileRSCrash()");
- ht.close();
- }
-
- /**
- * Randomly bring down/up RS servers while schema change is in progress. This test
- * is same as the above one but the only difference is that we intent to kill and start
- * new RS instances while a schema change is in progress.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test (timeout=70000)
- public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()");
- miniHBaseCluster.getRegionServer(4).abort("Killing RS 4");
- // Start a new RS before schema change .
- // Commenting the start RS as it is failing with DFS user permission NPE.
- //miniHBaseCluster.startRegionServer();
-
- // Let the dust settle
- Thread.sleep(10000);
- final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart";
- HTable ht = createTableAndValidate(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor("family2");
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- // Kill 2 RS now.
- miniHBaseCluster.getRegionServer(2).abort("Killing RS 2");
- // Let the dust settle
- Thread.sleep(10000);
- // We will be left with only one RS.
- waitForSchemaChangeProcess(tableName);
- assertFalse(msct.doesSchemaChangeNodeExists(tableName));
- Put put2 = new Put(row);
- put2.add(Bytes.toBytes("family2"), qualifier, value);
- ht.put(put2);
-
- Get get2 = new Get(row);
- get2.addColumn(Bytes.toBytes("family2"), qualifier);
- Result r2 = ht.get(get2);
- byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
- int result2 = Bytes.compareTo(value, tvalue2);
- assertEquals(result2, 0);
- LOG.info("result2 = " + result2);
- LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()");
- ht.close();
- }
-
- /**
- * Test scenario where primary master is brought down while processing an
- * alter request. This is harder one as it is very difficult the time this.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
-
- @Test (timeout=50000)
- public void testInstantSchemaChangeWhileMasterFailover() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("Start testInstantSchemaChangeWhileMasterFailover()");
- //Thread.sleep(5000);
-
- final String tableName = "testInstantSchemaChangeWhileMasterFailover";
- HTable ht = createTableAndValidate(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor("family2");
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- // Kill primary master now.
- Thread.sleep(50);
- miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
-
- // It may not be possible for us to check the schema change status
- // using waitForSchemaChangeProcess as our ZK session in MasterSchemachangeTracker will be
- // lost when master dies and hence may not be accurate. So relying on old-fashioned
- // sleep here.
- Thread.sleep(25000);
- Put put2 = new Put(row);
- put2.add(Bytes.toBytes("family2"), qualifier, value);
- ht.put(put2);
-
- Get get2 = new Get(row);
- get2.addColumn(Bytes.toBytes("family2"), qualifier);
- Result r2 = ht.get(get2);
- byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
- int result2 = Bytes.compareTo(value, tvalue2);
- assertEquals(result2, 0);
- LOG.info("result2 = " + result2);
- LOG.info("end testInstantSchemaChangeWhileMasterFailover()");
- ht.close();
- }
-
- /**
- * TEst the master fail over during a schema change request in ZK.
- * We create a fake schema change request in ZK and abort the primary master
- * mid-flight to simulate a master fail over scenario during a mid-flight
- * schema change process. The new master's schema janitor will eventually
- * cleanup this fake request after time out.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Ignore
- @Test
- public void testInstantSchemaOperationsInZKForMasterFailover() throws IOException,
- KeeperException, InterruptedException {
- LOG.info("testInstantSchemaOperationsInZKForMasterFailover() ");
- String tableName = "testInstantSchemaOperationsInZKForMasterFailover";
-
- conf = TEST_UTIL.getConfiguration();
- MasterSchemaChangeTracker activesct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- activesct.createSchemaChangeNode(tableName, 10);
- LOG.debug(activesct.getSchemaChangeNodePathForTable(tableName)
- + " created");
- assertTrue(activesct.doesSchemaChangeNodeExists(tableName));
- // Kill primary master now.
- miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
- // wait for 50 secs. This is so that our schema janitor from fail-over master will kick-in and
- // cleanup this failed/expired schema change request.
- Thread.sleep(50000);
- MasterSchemaChangeTracker newmsct = miniHBaseCluster.getMaster().getSchemaChangeTracker();
- assertFalse(newmsct.doesSchemaChangeNodeExists(tableName));
- LOG.debug(newmsct.getSchemaChangeNodePathForTable(tableName)
- + " deleted");
- LOG.info("END testInstantSchemaOperationsInZKForMasterFailover() ");
- }
-
- private HTable createTableAndValidate(String tableName) throws IOException {
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start createTableAndValidate()");
- HTableDescriptor[] tables = admin.listTables();
- int numTables = 0;
- if (tables != null) {
- numTables = tables.length;
- }
- HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
- HConstants.CATALOG_FAMILY);
- tables = this.admin.listTables();
- assertEquals(numTables + 1, tables.length);
- LOG.info("created table = " + tableName);
- return ht;
- }
-
-
- @org.junit.Rule
- public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
- new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
-}
-
-
-
diff --git src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeSplit.java src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeSplit.java
deleted file mode 100644
index 8f3124b..0000000
--- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeSplit.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(LargeTests.class)
-public class TestInstantSchemaChangeSplit extends InstantSchemaChangeTestBase {
-
- final Log LOG = LogFactory.getLog(getClass());
-
- /**
- * The objective of the following test is to validate that schema exclusions happen properly.
- * When a RS server dies or crashes(?) mid-flight during a schema refresh, we would exclude
- * all online regions in that RS, as well as the RS itself from schema change process.
- *
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void testInstantSchemaChangeExclusions() throws IOException,
- KeeperException, InterruptedException {
- MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
- LOG.info("Start testInstantSchemaChangeExclusions() ");
- String tableName = "testInstantSchemaChangeExclusions";
- HTable ht = createTableAndValidate(tableName);
-
- HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
- hcd.setMaxVersions(99);
- hcd.setBlockCacheEnabled(false);
-
- HRegionServer hrs = findRSWithOnlineRegionFor(tableName);
- //miniHBaseCluster.getRegionServer(0).abort("killed for test");
- admin.modifyColumn(Bytes.toBytes(tableName), hcd);
- hrs.abort("Aborting for tests");
- hrs.getSchemaChangeTracker().setSleepTimeMillis(20000);
-
- //admin.modifyColumn(Bytes.toBytes(tableName), hcd);
- LOG.debug("Waiting for Schema Change process to complete");
- waitForSchemaChangeProcess(tableName, 15000);
- assertEquals(msct.doesSchemaChangeNodeExists(tableName), false);
- // Sleep for some time so that our region is reassigned to some other RS
- // by master.
- Thread.sleep(10000);
- List onlineRegions
- = miniHBaseCluster.getRegions(Bytes.toBytes("testInstantSchemaChangeExclusions"));
- assertTrue(!onlineRegions.isEmpty());
- for (HRegion onlineRegion : onlineRegions) {
- HTableDescriptor htd = onlineRegion.getTableDesc();
- HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
- assertTrue(tableHcd.isBlockCacheEnabled() == false);
- assertEquals(tableHcd.getMaxVersions(), 99);
- }
- LOG.info("End testInstantSchemaChangeExclusions() ");
- ht.close();
- }
-
- /**
- * This test validates that when a schema change request fails on the
- * RS side, we appropriately register the failure in the Master Schema change
- * tracker's node as well as capture the error cause.
- *
- * Currently an alter request fails if RS fails with an IO exception say due to
- * missing or incorrect codec. With instant schema change the same failure happens
- * and we register the failure with associated cause and also update the
- * monitor status appropriately.
- *
- * The region(s) will be orphaned in both the cases.
- *
- */
- @Test
- public void testInstantSchemaChangeWhileRSOpenRegionFailure() throws IOException,
- KeeperException, InterruptedException {
- MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
-
- LOG.info("Start testInstantSchemaChangeWhileRSOpenRegionFailure() ");
- String tableName = "testInstantSchemaChangeWhileRSOpenRegionFailure";
- HTable ht = createTableAndValidate(tableName);
-
- // create now 100 regions
- TEST_UTIL.createMultiRegions(conf, ht,
- HConstants.CATALOG_FAMILY, 10);
-
- // wait for all the regions to be assigned
- Thread.sleep(10000);
- List onlineRegions
- = miniHBaseCluster.getRegions(
- Bytes.toBytes("testInstantSchemaChangeWhileRSOpenRegionFailure"));
- int size = onlineRegions.size();
- // we will not have any online regions
- LOG.info("Size of online regions = " + onlineRegions.size());
-
- HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
- hcd.setMaxVersions(99);
- hcd.setBlockCacheEnabled(false);
- hcd.setCompressionType(Compression.Algorithm.SNAPPY);
-
- admin.modifyColumn(Bytes.toBytes(tableName), hcd);
- Thread.sleep(100);
-
- assertEquals(msct.doesSchemaChangeNodeExists(tableName), true);
- Thread.sleep(10000);
- // get the current alter status and validate that its failure with appropriate error msg.
- MasterSchemaChangeTracker.MasterAlterStatus mas = msct.getMasterAlterStatus(tableName);
- assertTrue(mas != null);
- assertEquals(mas.getCurrentAlterStatus(),
- MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE);
- assertTrue(mas.getErrorCause() != null);
- LOG.info("End testInstantSchemaChangeWhileRSOpenRegionFailure() ");
- ht.close();
- }
-
- @Test
- public void testConcurrentInstantSchemaChangeAndSplit() throws IOException,
- InterruptedException, KeeperException {
- final String tableName = "testConcurrentInstantSchemaChangeAndSplit";
- conf = TEST_UTIL.getConfiguration();
- LOG.info("Start testConcurrentInstantSchemaChangeAndSplit()");
- final String newFamily = "newFamily";
- HTable ht = createTableAndValidate(tableName);
- final MasterSchemaChangeTracker msct =
- TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
-
- // create now 10 regions
- TEST_UTIL.createMultiRegions(conf, ht,
- HConstants.CATALOG_FAMILY, 4);
- int rowCount = TEST_UTIL.loadTable(ht, HConstants.CATALOG_FAMILY);
- //assertRowCount(t, rowCount);
-
- Runnable splitter = new Runnable() {
- public void run() {
- // run the splits now.
- try {
- LOG.info("Splitting table now ");
- admin.split(Bytes.toBytes(tableName));
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
-
- Runnable schemaChanger = new Runnable() {
- public void run() {
- HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
- try {
- admin.addColumn(Bytes.toBytes(tableName), hcd);
- } catch (IOException ioe) {
- ioe.printStackTrace();
-
- }
- }
- };
- schemaChanger.run();
- Thread.sleep(50);
- splitter.run();
- waitForSchemaChangeProcess(tableName, 40000);
-
- Put put1 = new Put(row);
- put1.add(Bytes.toBytes(newFamily), qualifier, value);
- LOG.info("******** Put into new column family ");
- ht.put(put1);
- ht.flushCommits();
-
- LOG.info("******** Get from new column family ");
- Get get1 = new Get(row);
- get1.addColumn(Bytes.toBytes(newFamily), qualifier);
- Result r = ht.get(get1);
- byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
- LOG.info(" Value put = " + value + " value from table = " + tvalue);
- int result = Bytes.compareTo(value, tvalue);
- assertEquals(result, 0);
- LOG.info("End testConcurrentInstantSchemaChangeAndSplit() ");
- ht.close();
- }
-
-
-
- @org.junit.Rule
- public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
- new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
-}
-
-
-
diff --git src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index d2b3060..41616c8 100644
--- src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -361,6 +361,12 @@ class MockRegionServer implements HRegionInterface, RegionServerServices {
}
@Override
+ public List getOnlineRegions(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public HServerInfo getHServerInfo() throws IOException {
// TODO Auto-generated method stub
return null;
@@ -514,17 +520,6 @@ class MockRegionServer implements HRegionInterface, RegionServerServices {
}
@Override
- public List getOnlineRegions(byte[] tableName) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void refreshRegion(HRegion hRegion) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
public Configuration getConfiguration() {
return this.conf;
}
diff --git src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
index 05f6b1a..45fa627 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java
@@ -29,14 +29,17 @@ import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -66,7 +69,7 @@ public class TestActiveMasterManager {
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null, true);
try {
- ZKUtil.deleteNode(zk, zk.masterAddressZNode);
+ ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
ZKUtil.deleteNode(zk, zk.clusterStateZNode);
} catch(KeeperException.NoNodeException nne) {}
@@ -108,7 +111,7 @@ public class TestActiveMasterManager {
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null, true);
try {
- ZKUtil.deleteNode(zk, zk.masterAddressZNode);
+ ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
ZKUtil.deleteNode(zk, zk.clusterStateZNode);
} catch(KeeperException.NoNodeException nne) {}
@@ -154,11 +157,11 @@ public class TestActiveMasterManager {
ms1.stop("stopping first server");
// Use a listener to capture when the node is actually deleted
- NodeDeletionListener listener = new NodeDeletionListener(zk, zk.masterAddressZNode);
+ NodeDeletionListener listener = new NodeDeletionListener(zk, zk.getMasterAddressZNode());
zk.registerListener(listener);
LOG.info("Deleting master node");
- ZKUtil.deleteNode(zk, zk.masterAddressZNode);
+ ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
// Wait for the node to be deleted
LOG.info("Waiting for active master manager to be notified");
@@ -178,7 +181,7 @@ public class TestActiveMasterManager {
assertTrue(t.isActiveMaster);
LOG.info("Deleting master node");
- ZKUtil.deleteNode(zk, zk.masterAddressZNode);
+ ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
}
/**
@@ -186,12 +189,12 @@ public class TestActiveMasterManager {
* @param zk
* @param thisMasterAddress
* @throws KeeperException
+ * @throws IOException
*/
private void assertMaster(ZooKeeperWatcher zk,
ServerName expectedAddress)
- throws KeeperException {
- ServerName readAddress =
- ServerName.parseVersionedServerName(ZKUtil.getData(zk, zk.masterAddressZNode));
+ throws KeeperException, IOException {
+ ServerName readAddress = MasterAddressTracker.getMasterAddress(zk);
assertNotNull(readAddress);
assertTrue(expectedAddress.equals(readAddress));
}
diff --git src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 227c5f2..b4dcb83 100644
--- src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -51,15 +51,12 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
-import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -156,6 +153,11 @@ public class TestCatalogJanitor {
}
@Override
+ public void checkTableModifiable(byte[] tableName) throws IOException {
+ //no-op
+ }
+
+ @Override
public void createTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException {
// no-op
@@ -171,11 +173,6 @@ public class TestCatalogJanitor {
return null;
}
- public void checkTableModifiable(byte[] tableName,
- EventHandler.EventType eventType)
- throws IOException {
- }
-
@Override
public MasterFileSystem getMasterFileSystem() {
return this.mfs;
@@ -263,14 +260,6 @@ public class TestCatalogJanitor {
};
}
- public MasterSchemaChangeTracker getSchemaChangeTracker() {
- return null;
- }
-
- public RegionServerTracker getRegionServerTracker() {
- return null;
- }
-
@Override
public boolean isServerShutdownHandlerEnabled() {
return true;
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
index e91d83c..21e0c02 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -69,7 +70,7 @@ public class TestMasterAddressManager {
zk.registerListener(addressManager);
// Use a listener to capture when the node is actually created
- NodeCreationListener listener = new NodeCreationListener(zk, zk.masterAddressZNode);
+ NodeCreationListener listener = new NodeCreationListener(zk, zk.getMasterAddressZNode());
zk.registerListener(listener);
// Create the master node with a dummy address
@@ -77,7 +78,7 @@ public class TestMasterAddressManager {
int port = 1234;
ServerName sn = new ServerName(host, port, System.currentTimeMillis());
LOG.info("Creating master node");
- ZKUtil.createEphemeralNodeAndWatch(zk, zk.masterAddressZNode, sn.getVersionedBytes());
+ MasterAddressTracker.setMasterAddress(zk, zk.getMasterAddressZNode(), sn);
// Wait for the node to be created
LOG.info("Waiting for master address manager to be notified");
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java
index 8478260..ffce7e8 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/TestRSStatusServlet.java
@@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterAddressTracker;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.tmpl.regionserver.RSStatusTmpl;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Before;
import org.junit.Test;
diff --git src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
index bb3ddd7..7d02759 100644
--- src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
+++ src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
@@ -63,9 +63,6 @@ public class MockRegionServerServices implements RegionServerServices {
return null;
}
- public void refreshRegion(HRegion hRegion) throws IOException {
- }
-
@Override
public void addToOnlineRegions(HRegion r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r);