+ * Does not transition nodes from other states. If a node already exists for + * this region, a {@link NodeExistsException} will be thrown. + * + * @param region region to be created as offline + * @param serverName server event originates from + * @throws IOException + */ + @Override + public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName, + final HRegionInfo a, final HRegionInfo b) throws IOException { + LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName() + + " in PENDING_MERGE state")); + byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b); + RegionTransition rt = + RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), + serverName, payload); + String node = ZKAssign.getNodeName(watcher, region.getEncodedName()); + try { + if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) { + throw new IOException("Failed create of ephemeral " + node); + } + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop + * .hbase.Server, org.apache.hadoop.hbase.HRegionInfo) + */ + @Override + public void clean(final HRegionInfo hri) { + try { + // Only delete if its in expected state; could have been hijacked. + if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager + .getServer().getServerName())) { + ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager + .getServer().getServerName()); + } + } catch (KeeperException.NoNodeException e) { + LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); + } catch (KeeperException e) { + manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); + } + } + + /* + * ZooKeeper implementation of finishRegionMergeTransaction + */ + @Override + public void completeRegionMergeTransaction(final RegionServerServices services, + HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd, + HRegion mergedRegion) throws IOException { + ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd; + if (manager.getServer() == null + || manager.getServer().getCoordinatedStateManager() == null) { + return; + } + // Tell master about merge by updating zk. If we fail, abort. + try { + transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), + manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED); + + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + int spins = 0; + // Now wait for the master to process the merge. We know it's done + // when the znode is deleted. The reason we keep tickling the znode is + // that it's possible for the master to miss an event. + do { + if (spins % 10 == 0) { + LOG.debug("Still waiting on the master to process the merge for " + + mergedRegionInfo.getEncodedName() + ", waited " + + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms"); + } + Thread.sleep(100); + // When this returns -1 it means the znode doesn't exist + transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), + manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED); + spins++; + } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped() + && !services.isStopping()); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException("Failed telling master about merge " + + mergedRegionInfo.getEncodedName(), e); + } + // Leaving here, the mergedir with its dross will be in place but since the + // merge was successful, just leave it; it'll be cleaned when region_a is + // cleaned up by CatalogJanitor on master + } + + /* + * Zookeeper implementation of region merge confirmation + */ + @Override + public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b, + ServerName serverName, RegionMergeDetails rmd) throws IOException { + transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING, + RS_ZK_REGION_MERGING); + } + + /* + * Zookeeper implementation of region merge processing + */ + @Override + public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, + ServerName sn, RegionMergeDetails rmd) throws IOException { + transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE, + EventType.RS_ZK_REGION_MERGING); + } + + /** + * Transitions an existing ephemeral node for the specified region which is + * currently in the begin state to be in the end state. Master cleans up the + * final MERGE znode when it reads it (or if we crash, zk will clean it up). + * + *
+ * Does not transition nodes from other states. If for some reason the node + * could not be transitioned, the method returns -1. If the transition is + * successful, the version of the node after transition is updated in details. + * + *
+ * This method can fail and return false for three different reasons: + *
+ * Does not set any watches. + * + *
+ * This method should only be used by a RegionServer when merging two regions.
+ *
+ * @param merged region to be transitioned to opened
+ * @param a merging region A
+ * @param b merging region B
+ * @param serverName server event originates from
+ * @param rmd region merge details
+ * @param beginState the expected current state the node should be
+ * @param endState the state to be transition to
+ * @throws IOException
+ */
+ private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
+ ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
+ final EventType endState) throws IOException {
+ ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
+ byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
+ try {
+ zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
+ endState, zrmd.getZnodeVersion(), payload));
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 73b06b2..9f93e0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -60,10 +60,12 @@ import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
@@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
-import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -3538,11 +3539,14 @@ public class AssignmentManager extends ZooKeeperListener {
EventType et = rt.getEventType();
if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
try {
- if (RegionMergeTransaction.transitionMergingNode(watcher, p,
- hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
- EventType.RS_ZK_REGION_MERGING) == -1) {
+ RegionMergeCoordination.RegionMergeDetails std =
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().getDefaultDetails();
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
+ if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
byte[] data = ZKAssign.getData(watcher, encodedName);
- EventType currentType = null;
+ EventType currentType = null;
if (data != null) {
RegionTransition newRt = RegionTransition.parseFrom(data);
currentType = newRt.getEventType();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
index eedba2b..bd29104 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
@@ -13,15 +13,11 @@
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
+ * License for the specific language governing permissions and limitationsME
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +31,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
-import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
@@ -44,19 +39,15 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
+import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.data.Stat;
/**
* Executes region merge as a "transaction". It is similar with
@@ -97,10 +88,9 @@ public class RegionMergeTransaction {
private final HRegion region_b;
// merges dir is under region_a
private final Path mergesdir;
- private int znodeVersion = -1;
// We only merge adjacent regions if forcible is false
private final boolean forcible;
- private boolean useZKForAssignment;
+ private boolean useCoordinationForAssignment;
/**
* Types to add to the transaction journal. Each enum is a step in the merge
@@ -110,7 +100,7 @@ public class RegionMergeTransaction {
/**
* Set region as in transition, set it into MERGING state.
*/
- SET_MERGING_IN_ZK,
+ SET_MERGING,
/**
* We created the temporary merge data directory.
*/
@@ -152,6 +142,8 @@ public class RegionMergeTransaction {
private RegionServerCoprocessorHost rsCoprocessorHost = null;
+ private RegionMergeDetails rmd;
+
/**
* Constructor
* @param a region a to merge
@@ -230,8 +222,7 @@ public class RegionMergeTransaction {
/**
* Run the transaction.
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
+ * @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
@@ -240,9 +231,15 @@ public class RegionMergeTransaction {
* @see #rollback(Server, RegionServerServices)
*/
public HRegion execute(final Server server,
- final RegionServerServices services) throws IOException {
- useZKForAssignment = server == null ? true :
- ConfigUtil.useZKForAssignment(server.getConfiguration());
+ final RegionServerServices services) throws IOException {
+ useCoordinationForAssignment =
+ server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
+ if (rmd == null) {
+ rmd =
+ server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server
+ .getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails()
+ : null;
+ }
if (rsCoprocessorHost == null) {
rsCoprocessorHost = server != null ?
((HRegionServer) server).getRegionServerCoprocessorHost() : null;
@@ -257,14 +254,20 @@ public class RegionMergeTransaction {
public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
HRegion mergedRegion) throws IOException {
openMergedRegion(server, services, mergedRegion);
- transitionZKNode(server, services, mergedRegion);
+ if (useCoordination(server)) {
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().completeRegionMergeTransaction(services, mergedRegionInfo,
+ region_a, region_b, rmd, mergedRegion);
+ }
+ if (rsCoprocessorHost != null) {
+ rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
+ }
return mergedRegion;
}
/**
* Prepare the merged region and region files.
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
+ * @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @return merged region
* @throws IOException If thrown, transaction failed. Call
@@ -286,7 +289,7 @@ public class RegionMergeTransaction {
}
}
- // If true, no cluster to write meta edits to or to update znodes in.
+ // If true, no cluster to write meta edits to or to use coordination.
boolean testing = server == null ? true : server.getConfiguration()
.getBoolean("hbase.testing.nocluster", false);
@@ -320,7 +323,7 @@ public class RegionMergeTransaction {
// will determine whether the region is merged or not in case of failures.
// If it is successful, master will roll-forward, if not, master will
// rollback
- if (!testing && useZKForAssignment) {
+ if (!testing && useCoordinationForAssignment) {
if (metaEntries.isEmpty()) {
MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
.getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
@@ -328,7 +331,7 @@ public class RegionMergeTransaction {
mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
}
- } else if (services != null && !useZKForAssignment) {
+ } else if (services != null && !useCoordinationForAssignment) {
if (!services.reportRegionTransition(TransitionCode.MERGE_PONR,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
// Passed PONR, let SSH clean it up
@@ -377,17 +380,24 @@ public class RegionMergeTransaction {
public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
boolean testing) throws IOException {
- // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
- // have zookeeper so don't do zk stuff if server or zookeeper is null
- if (useZKAndZKIsSet(server)) {
+ if (rmd == null) {
+ rmd =
+ server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server
+ .getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails()
+ : null;
+ }
+
+ // If server doesn't have a coordination state manager, don't do coordination actions.
+ if (useCoordination(server)) {
try {
- createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
- server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
- } catch (KeeperException e) {
- throw new IOException("Failed creating PENDING_MERGE znode on "
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo,
+ server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
+ } catch (IOException e) {
+ throw new IOException("Failed to start region merge transaction for "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
- } else if (services != null && !useZKForAssignment) {
+ } else if (services != null && !useCoordinationForAssignment) {
if (!services.reportRegionTransition(TransitionCode.READY_TO_MERGE,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
throw new IOException("Failed to get ok from master to merge "
@@ -395,12 +405,14 @@ public class RegionMergeTransaction {
+ region_b.getRegionInfo().getRegionNameAsString());
}
}
- this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
- if (useZKAndZKIsSet(server)) {
+ this.journal.add(JournalEntry.SET_MERGING);
+ if (useCoordination(server)) {
// After creating the merge node, wait for master to transition it
// from PENDING_MERGE to MERGING so that we can move on. We want master
// knows about it and won't transition any region which is merging.
- znodeVersion = getZKNode(server, services);
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().waitForRegionMergeTransaction(services, mergedRegionInfo,
+ region_a, region_b, rmd);
}
this.region_a.getRegionFileSystem().createMergesDir();
@@ -420,16 +432,15 @@ public class RegionMergeTransaction {
// clean this up.
mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
- if (server != null && useZKAndZKIsSet(server)) {
+ if (useCoordination(server)) {
try {
- // Do one more check on the merging znode (before it is too late) in case
- // any merging region is moved somehow. If so, the znode transition will fail.
- this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
- this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
- server.getServerName(), this.znodeVersion,
- RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
- } catch (KeeperException e) {
- throw new IOException("Failed setting MERGING znode on "
+ // Do the final check in case any merging region is moved somehow. If so, the transition
+ // will fail.
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo,
+ region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd);
+ } catch (IOException e) {
+ throw new IOException("Failed setting MERGING on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
}
@@ -545,8 +556,7 @@ public class RegionMergeTransaction {
/**
* Perform time consuming opening of the merged region.
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
+ * @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @param merged the merged region
* @throws IOException If thrown, transaction failed. Call
@@ -569,7 +579,7 @@ public class RegionMergeTransaction {
if (services != null) {
try {
- if (useZKForAssignment) {
+ if (useCoordinationForAssignment) {
services.postOpenDeployTasks(merged, server.getCatalogTracker());
} else if (!services.reportRegionTransition(TransitionCode.MERGED,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
@@ -585,134 +595,6 @@ public class RegionMergeTransaction {
}
/**
- * Finish off merge transaction, transition the zknode
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
- * @param services Used to online/offline regions.
- * @throws IOException If thrown, transaction failed. Call
- * {@link #rollback(Server, RegionServerServices)}
- */
- void transitionZKNode(final Server server, final RegionServerServices services,
- HRegion mergedRegion) throws IOException {
- if (useZKAndZKIsSet(server)) {
- // Tell master about merge by updating zk. If we fail, abort.
- try {
- this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
- this.mergedRegionInfo, region_a.getRegionInfo(),
- region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
- RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
-
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- int spins = 0;
- // Now wait for the master to process the merge. We know it's done
- // when the znode is deleted. The reason we keep tickling the znode is
- // that it's possible for the master to miss an event.
- do {
- if (spins % 10 == 0) {
- LOG.debug("Still waiting on the master to process the merge for "
- + this.mergedRegionInfo.getEncodedName() + ", waited "
- + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
- }
- Thread.sleep(100);
- // When this returns -1 it means the znode doesn't exist
- this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
- this.mergedRegionInfo, region_a.getRegionInfo(),
- region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
- RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
- spins++;
- } while (this.znodeVersion != -1 && !server.isStopped()
- && !services.isStopping());
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new IOException("Failed telling master about merge "
- + mergedRegionInfo.getEncodedName(), e);
- }
- }
-
- if (rsCoprocessorHost != null) {
- rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
- }
-
- // Leaving here, the mergedir with its dross will be in place but since the
- // merge was successful, just leave it; it'll be cleaned when region_a is
- // cleaned up by CatalogJanitor on master
- }
-
- /**
- * Wait for the merging node to be transitioned from pending_merge
- * to merging by master. That's how we are sure master has processed
- * the event and is good with us to move on. If we don't get any update,
- * we periodically transition the node so that master gets the callback.
- * If the node is removed or is not in pending_merge state any more,
- * we abort the merge.
- */
- private int getZKNode(final Server server,
- final RegionServerServices services) throws IOException {
- // Wait for the master to process the pending_merge.
- try {
- int spins = 0;
- Stat stat = new Stat();
- ZooKeeperWatcher zkw = server.getZooKeeper();
- ServerName expectedServer = server.getServerName();
- String node = mergedRegionInfo.getEncodedName();
- while (!(server.isStopped() || services.isStopping())) {
- if (spins % 5 == 0) {
- LOG.debug("Still waiting for master to process "
- + "the pending_merge for " + node);
- transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
- region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
- RS_ZK_REQUEST_REGION_MERGE);
- }
- Thread.sleep(100);
- spins++;
- byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
- if (data == null) {
- throw new IOException("Data is null, merging node "
- + node + " no longer exists");
- }
- RegionTransition rt = RegionTransition.parseFrom(data);
- EventType et = rt.getEventType();
- if (et == RS_ZK_REGION_MERGING) {
- ServerName serverName = rt.getServerName();
- if (!serverName.equals(expectedServer)) {
- throw new IOException("Merging node " + node + " is for "
- + serverName + ", not us " + expectedServer);
- }
- byte [] payloadOfMerging = rt.getPayload();
- List
- * Does not transition nodes from other states. If a node already exists for
- * this region, a {@link NodeExistsException} will be thrown.
- *
- * @param zkw zk reference
- * @param region region to be created as offline
- * @param serverName server event originates from
- * @throws KeeperException
- * @throws IOException
- */
- public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
- final ServerName serverName, final HRegionInfo a,
- final HRegionInfo b) throws KeeperException, IOException {
- LOG.debug(zkw.prefix("Creating ephemeral node for "
- + region.getEncodedName() + " in PENDING_MERGE state"));
- byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
- RegionTransition rt = RegionTransition.createRegionTransition(
- RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
- String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
- if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
- throw new IOException("Failed create of ephemeral " + node);
- }
- }
- /**
- * Transitions an existing ephemeral node for the specified region which is
- * currently in the begin state to be in the end state. Master cleans up the
- * final MERGE znode when it reads it (or if we crash, zk will clean it up).
- *
- *
- * Does not transition nodes from other states. If for some reason the node
- * could not be transitioned, the method returns -1. If the transition is
- * successful, the version of the node after transition is returned.
- *
- *
- * This method can fail and return false for three different reasons:
- *
- * Does not set any watches.
- *
- *
- * This method should only be used by a RegionServer when merging two regions.
- *
- * @param zkw zk reference
- * @param merged region to be transitioned to opened
- * @param a merging region A
- * @param b merging region B
- * @param serverName server event originates from
- * @param znodeVersion expected version of data before modification
- * @param beginState the expected current state the znode should be
- * @param endState the state to be transition to
- * @return version of node after transition, -1 if unsuccessful transition
- * @throws KeeperException if unexpected zookeeper exception
- * @throws IOException
- */
- public static int transitionMergingNode(ZooKeeperWatcher zkw,
- HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
- final int znodeVersion, final EventType beginState,
- final EventType endState) throws KeeperException, IOException {
- byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
- return ZKAssign.transitionNode(zkw, merged, serverName,
- beginState, endState, znodeVersion, payload);
- }
/**
* Checks if the given region has merge qualifier in hbase:meta
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index de7ce1a..a8fbf54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.RegionState.State;
@@ -389,8 +390,8 @@ public class TestMasterFailover {
// Regions of table of merging regions
// Cause: Master was down while merging was going on
- RegionMergeTransaction.createNodeMerging(
- zkw, newRegion, mergingServer, a, b);
+ ((BaseCoordinatedStateManager) hrs.getCoordinatedStateManager())
+ .getRegionMergeCoordination().startRegionMergeTransaction(newRegion, mergingServer, a, b);
/*
* ZK = NONE
- *
- *
- *