diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index 6cee2df..9c9bfba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -67,4 +67,9 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan * Method to retrieve coordination for opening region operations. */ public abstract OpenRegionCoordination getOpenRegionCoordination(); -} + + /** + * Method to retrieve coordination for region merge transaction + */ + public abstract RegionMergeCoordination getRegionMergeCoordination(); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java new file mode 100644 index 0000000..b51dd9c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * Coordination operations for region merge transaction. The operation should be coordinated at the + * following stages:
+ * 1. startRegionMergeTransaction - all preparation/initialization for merge region transaction
+ * 2. waitForRegionMergeTransaction - wait until coordination complete all works related + * to merge
+ * 3. confirmRegionMergeTransaction - confirm that the merge could be completed and none of merging + * regions moved somehow
+ * 4. completeRegionMergeTransaction - all steps that are required to complete the transaction. + * Called after PONR (point of no return)
+ */ +@InterfaceAudience.Private +public interface RegionMergeCoordination { + + RegionMergeDetails getDefaultDetails(); + + /** + * Dummy interface for region merge transaction details. + */ + public static interface RegionMergeDetails { + } + + /** + * Start the region merge transaction + * @param region region to be created as offline + * @param serverName server event originates from + * @throws IOException + */ + void startRegionMergeTransaction(HRegionInfo region, ServerName serverName, HRegionInfo a, + HRegionInfo b) throws IOException; + + /** + * Get everything ready for region merge + * @throws IOException + */ + void waitForRegionMergeTransaction(RegionServerServices services, HRegionInfo mergedRegionInfo, + HRegion region_a, HRegion region_b, RegionMergeDetails details) throws IOException; + + /** + * Confirm that the region merge can be performed + * @param merged region + * @param a merging region A + * @param b merging region B + * @param serverName server event originates from + * @param rmd region merge details + * @throws IOException If thrown, transaction failed. + */ + void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b, + ServerName serverName, RegionMergeDetails rmd) throws IOException; + + /** + * @param merged region + * @param a merging region A + * @param b merging region B + * @param serverName server event originates from + * @param rmd region merge details + * @throws IOException + */ + void processRegionMergeRequest(HRegionInfo merged, HRegionInfo a, HRegionInfo b, + ServerName serverName, RegionMergeDetails rmd) throws IOException; + + /** + * Finish off merge transaction + * @param services Used to online/offline regions. + * @param merged region + * @param region_a merging region A + * @param region_b merging region B + * @param rmd region merge details + * @param mergedRegion + * @throws IOException If thrown, transaction failed. Call + * {@link RegionMergeTransaction#rollback(Server, RegionServerServices)} + */ + void completeRegionMergeTransaction(RegionServerServices services, HRegionInfo merged, + HRegion region_a, HRegion region_b, RegionMergeDetails rmd, HRegion mergedRegion) + throws IOException; + + /** + * This method is used during rollback + * @param merged region to be rolled back + */ + void clean(HRegionInfo merged); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 2c2fa44..a5492a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -38,6 +38,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { protected SplitTransactionCoordination splitTransactionCoordination; protected CloseRegionCoordination closeRegionCoordination; protected OpenRegionCoordination openRegionCoordination; + protected RegionMergeCoordination regionMergeCoordination; @Override public void initialize(Server server) { @@ -47,6 +48,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher); closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher); openRegionCoordination = new ZkOpenRegionCoordination(this, watcher); + regionMergeCoordination = new ZkRegionMergeCoordination(this, watcher); } @Override @@ -78,4 +80,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { public OpenRegionCoordination getOpenRegionCoordination() { return openRegionCoordination; } + + @Override + public RegionMergeCoordination getRegionMergeCoordination() { + return regionMergeCoordination; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java new file mode 100644 index 0000000..8c18821 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java @@ -0,0 +1,326 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coordination; + +import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED; +import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING; +import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionTransition; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.data.Stat; + +public class ZkRegionMergeCoordination implements RegionMergeCoordination { + + private CoordinatedStateManager manager; + private final ZooKeeperWatcher watcher; + + private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class); + + public ZkRegionMergeCoordination(CoordinatedStateManager manager, + ZooKeeperWatcher watcher) { + this.manager = manager; + this.watcher = watcher; + } + + /** + * ZK-based implementation. Has details about whether the state transition should be reflected in + * ZK, as well as expected version of znode. + */ + public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails { + private int znodeVersion; + + public ZkRegionMergeDetails() { + } + + public int getZnodeVersion() { + return znodeVersion; + } + + public void setZnodeVersion(int znodeVersion) { + this.znodeVersion = znodeVersion; + } + } + + @Override + public RegionMergeDetails getDefaultDetails() { + ZkRegionMergeDetails zstd = new ZkRegionMergeDetails(); + zstd.setZnodeVersion(-1); + return zstd; + } + + /** + * Wait for the merging node to be transitioned from pending_merge + * to merging by master. That's how we are sure master has processed + * the event and is good with us to move on. If we don't get any update, + * we periodically transition the node so that master gets the callback. + * If the node is removed or is not in pending_merge state any more, + * we abort the merge. + * @throws IOException + */ + + @Override + public void waitForRegionMergeTransaction(RegionServerServices services, + HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details) + throws IOException { + try { + int spins = 0; + Stat stat = new Stat(); + ServerName expectedServer = manager.getServer().getServerName(); + String node = mergedRegionInfo.getEncodedName(); + ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details; + while (!(manager.getServer().isStopped() || services.isStopping())) { + if (spins % 5 == 0) { + LOG.debug("Still waiting for master to process " + "the pending_merge for " + node); + ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails(); + transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), + region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE, + RS_ZK_REQUEST_REGION_MERGE); + } + Thread.sleep(100); + spins++; + byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat); + if (data == null) { + throw new IOException("Data is null, merging node " + node + " no longer exists"); + } + RegionTransition rt = RegionTransition.parseFrom(data); + EventType et = rt.getEventType(); + if (et == RS_ZK_REGION_MERGING) { + ServerName serverName = rt.getServerName(); + if (!serverName.equals(expectedServer)) { + throw new IOException("Merging node " + node + " is for " + serverName + ", not us " + + expectedServer); + } + byte[] payloadOfMerging = rt.getPayload(); + List mergingRegions = + HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length); + assert mergingRegions.size() == 3; + HRegionInfo a = mergingRegions.get(1); + HRegionInfo b = mergingRegions.get(2); + HRegionInfo hri_a = region_a.getRegionInfo(); + HRegionInfo hri_b = region_b.getRegionInfo(); + if (!(hri_a.equals(a) && hri_b.equals(b))) { + throw new IOException("Merging node " + node + " is for " + a + ", " + b + + ", not expected regions: " + hri_a + ", " + hri_b); + } + // Master has processed it. + zdetails.setZnodeVersion(stat.getVersion()); + return; + } + if (et != RS_ZK_REQUEST_REGION_MERGE) { + throw new IOException("Merging node " + node + " moved out of merging to " + et); + } + } + // Server is stopping/stopped + throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped")); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException("Failed getting MERGING znode on " + + mergedRegionInfo.getRegionNameAsString(), e); + } + } + + /** + * Creates a new ephemeral node in the PENDING_MERGE state for the merged region. + * Create it ephemeral in case regionserver dies mid-merge. + * + *

+ * Does not transition nodes from other states. If a node already exists for + * this region, a {@link NodeExistsException} will be thrown. + * + * @param region region to be created as offline + * @param serverName server event originates from + * @throws IOException + */ + @Override + public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName, + final HRegionInfo a, final HRegionInfo b) throws IOException { + LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName() + + " in PENDING_MERGE state")); + byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b); + RegionTransition rt = + RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), + serverName, payload); + String node = ZKAssign.getNodeName(watcher, region.getEncodedName()); + try { + if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) { + throw new IOException("Failed create of ephemeral " + node); + } + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /* + * (non-Javadoc) + * @see + * org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop + * .hbase.Server, org.apache.hadoop.hbase.HRegionInfo) + */ + @Override + public void clean(final HRegionInfo hri) { + try { + // Only delete if its in expected state; could have been hijacked. + if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager + .getServer().getServerName())) { + ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager + .getServer().getServerName()); + } + } catch (KeeperException.NoNodeException e) { + LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); + } catch (KeeperException e) { + manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); + } + } + + /* + * ZooKeeper implementation of finishRegionMergeTransaction + */ + @Override + public void completeRegionMergeTransaction(final RegionServerServices services, + HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd, + HRegion mergedRegion) throws IOException { + ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd; + if (manager.getServer() == null + || manager.getServer().getCoordinatedStateManager() == null) { + return; + } + // Tell master about merge by updating zk. If we fail, abort. + try { + transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), + manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED); + + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + int spins = 0; + // Now wait for the master to process the merge. We know it's done + // when the znode is deleted. The reason we keep tickling the znode is + // that it's possible for the master to miss an event. + do { + if (spins % 10 == 0) { + LOG.debug("Still waiting on the master to process the merge for " + + mergedRegionInfo.getEncodedName() + ", waited " + + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms"); + } + Thread.sleep(100); + // When this returns -1 it means the znode doesn't exist + transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), + manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED); + spins++; + } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped() + && !services.isStopping()); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException("Failed telling master about merge " + + mergedRegionInfo.getEncodedName(), e); + } + // Leaving here, the mergedir with its dross will be in place but since the + // merge was successful, just leave it; it'll be cleaned when region_a is + // cleaned up by CatalogJanitor on master + } + + /* + * Zookeeper implementation of region merge confirmation + */ + @Override + public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b, + ServerName serverName, RegionMergeDetails rmd) throws IOException { + transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING, + RS_ZK_REGION_MERGING); + } + + /* + * Zookeeper implementation of region merge processing + */ + @Override + public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, + ServerName sn, RegionMergeDetails rmd) throws IOException { + transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE, + EventType.RS_ZK_REGION_MERGING); + } + + /** + * Transitions an existing ephemeral node for the specified region which is + * currently in the begin state to be in the end state. Master cleans up the + * final MERGE znode when it reads it (or if we crash, zk will clean it up). + * + *

+ * Does not transition nodes from other states. If for some reason the node + * could not be transitioned, the method returns -1. If the transition is + * successful, the version of the node after transition is updated in details. + * + *

+ * This method can fail and return false for three different reasons: + *

+ * + *

+ * Does not set any watches. + * + *

+ * This method should only be used by a RegionServer when merging two regions. + * + * @param merged region to be transitioned to opened + * @param a merging region A + * @param b merging region B + * @param serverName server event originates from + * @param rmd region merge details + * @param beginState the expected current state the node should be + * @param endState the state to be transition to + * @throws IOException + */ + private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b, + ServerName serverName, RegionMergeDetails rmd, final EventType beginState, + final EventType endState) throws IOException { + ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd; + byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b); + try { + zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState, + endState, zrmd.getZnodeVersion(), payload)); + } catch (KeeperException e) { + throw new IOException(e); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 73b06b2..9f93e0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -60,10 +60,12 @@ import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.RegionMergeCoordination; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails; import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; +import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; @@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; -import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -3538,11 +3539,14 @@ public class AssignmentManager extends ZooKeeperListener { EventType et = rt.getEventType(); if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) { try { - if (RegionMergeTransaction.transitionMergingNode(watcher, p, - hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE, - EventType.RS_ZK_REGION_MERGING) == -1) { + RegionMergeCoordination.RegionMergeDetails std = + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().getDefaultDetails(); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std); + if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) { byte[] data = ZKAssign.getData(watcher, encodedName); - EventType currentType = null; + EventType currentType = null; if (data != null) { RegionTransition newRt = RegionTransition.parseFrom(data); currentType = newRt.getEventType(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index eedba2b..bd29104 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -13,15 +13,11 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations + * License for the specific language governing permissions and limitationsME * under the License. */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -35,7 +31,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaMutationAnnotation; -import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; @@ -44,19 +39,15 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode; +import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails; import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.data.Stat; /** * Executes region merge as a "transaction". It is similar with @@ -97,10 +88,9 @@ public class RegionMergeTransaction { private final HRegion region_b; // merges dir is under region_a private final Path mergesdir; - private int znodeVersion = -1; // We only merge adjacent regions if forcible is false private final boolean forcible; - private boolean useZKForAssignment; + private boolean useCoordinationForAssignment; /** * Types to add to the transaction journal. Each enum is a step in the merge @@ -110,7 +100,7 @@ public class RegionMergeTransaction { /** * Set region as in transition, set it into MERGING state. */ - SET_MERGING_IN_ZK, + SET_MERGING, /** * We created the temporary merge data directory. */ @@ -152,6 +142,8 @@ public class RegionMergeTransaction { private RegionServerCoprocessorHost rsCoprocessorHost = null; + private RegionMergeDetails rmd; + /** * Constructor * @param a region a to merge @@ -230,8 +222,7 @@ public class RegionMergeTransaction { /** * Run the transaction. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) + * @param server Hosting server instance. Can be null when testing * @param services Used to online/offline regions. * @throws IOException If thrown, transaction failed. Call * {@link #rollback(Server, RegionServerServices)} @@ -240,9 +231,15 @@ public class RegionMergeTransaction { * @see #rollback(Server, RegionServerServices) */ public HRegion execute(final Server server, - final RegionServerServices services) throws IOException { - useZKForAssignment = server == null ? true : - ConfigUtil.useZKForAssignment(server.getConfiguration()); + final RegionServerServices services) throws IOException { + useCoordinationForAssignment = + server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration()); + if (rmd == null) { + rmd = + server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server + .getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails() + : null; + } if (rsCoprocessorHost == null) { rsCoprocessorHost = server != null ? ((HRegionServer) server).getRegionServerCoprocessorHost() : null; @@ -257,14 +254,20 @@ public class RegionMergeTransaction { public HRegion stepsAfterPONR(final Server server, final RegionServerServices services, HRegion mergedRegion) throws IOException { openMergedRegion(server, services, mergedRegion); - transitionZKNode(server, services, mergedRegion); + if (useCoordination(server)) { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().completeRegionMergeTransaction(services, mergedRegionInfo, + region_a, region_b, rmd, mergedRegion); + } + if (rsCoprocessorHost != null) { + rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); + } return mergedRegion; } /** * Prepare the merged region and region files. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) + * @param server Hosting server instance. Can be null when testing * @param services Used to online/offline regions. * @return merged region * @throws IOException If thrown, transaction failed. Call @@ -286,7 +289,7 @@ public class RegionMergeTransaction { } } - // If true, no cluster to write meta edits to or to update znodes in. + // If true, no cluster to write meta edits to or to use coordination. boolean testing = server == null ? true : server.getConfiguration() .getBoolean("hbase.testing.nocluster", false); @@ -320,7 +323,7 @@ public class RegionMergeTransaction { // will determine whether the region is merged or not in case of failures. // If it is successful, master will roll-forward, if not, master will // rollback - if (!testing && useZKForAssignment) { + if (!testing && useCoordinationForAssignment) { if (metaEntries.isEmpty()) { MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a .getRegionInfo(), region_b.getRegionInfo(), server.getServerName()); @@ -328,7 +331,7 @@ public class RegionMergeTransaction { mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries); } - } else if (services != null && !useZKForAssignment) { + } else if (services != null && !useCoordinationForAssignment) { if (!services.reportRegionTransition(TransitionCode.MERGE_PONR, mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { // Passed PONR, let SSH clean it up @@ -377,17 +380,24 @@ public class RegionMergeTransaction { public HRegion stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { - // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't - // have zookeeper so don't do zk stuff if server or zookeeper is null - if (useZKAndZKIsSet(server)) { + if (rmd == null) { + rmd = + server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server + .getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails() + : null; + } + + // If server doesn't have a coordination state manager, don't do coordination actions. + if (useCoordination(server)) { try { - createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo, - server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo()); - } catch (KeeperException e) { - throw new IOException("Failed creating PENDING_MERGE znode on " + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo, + server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo()); + } catch (IOException e) { + throw new IOException("Failed to start region merge transaction for " + this.mergedRegionInfo.getRegionNameAsString(), e); } - } else if (services != null && !useZKForAssignment) { + } else if (services != null && !useCoordinationForAssignment) { if (!services.reportRegionTransition(TransitionCode.READY_TO_MERGE, mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { throw new IOException("Failed to get ok from master to merge " @@ -395,12 +405,14 @@ public class RegionMergeTransaction { + region_b.getRegionInfo().getRegionNameAsString()); } } - this.journal.add(JournalEntry.SET_MERGING_IN_ZK); - if (useZKAndZKIsSet(server)) { + this.journal.add(JournalEntry.SET_MERGING); + if (useCoordination(server)) { // After creating the merge node, wait for master to transition it // from PENDING_MERGE to MERGING so that we can move on. We want master // knows about it and won't transition any region which is merging. - znodeVersion = getZKNode(server, services); + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().waitForRegionMergeTransaction(services, mergedRegionInfo, + region_a, region_b, rmd); } this.region_a.getRegionFileSystem().createMergesDir(); @@ -420,16 +432,15 @@ public class RegionMergeTransaction { // clean this up. mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB); - if (server != null && useZKAndZKIsSet(server)) { + if (useCoordination(server)) { try { - // Do one more check on the merging znode (before it is too late) in case - // any merging region is moved somehow. If so, the znode transition will fail. - this.znodeVersion = transitionMergingNode(server.getZooKeeper(), - this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(), - server.getServerName(), this.znodeVersion, - RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING); - } catch (KeeperException e) { - throw new IOException("Failed setting MERGING znode on " + // Do the final check in case any merging region is moved somehow. If so, the transition + // will fail. + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo, + region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd); + } catch (IOException e) { + throw new IOException("Failed setting MERGING on " + this.mergedRegionInfo.getRegionNameAsString(), e); } } @@ -545,8 +556,7 @@ public class RegionMergeTransaction { /** * Perform time consuming opening of the merged region. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) + * @param server Hosting server instance. Can be null when testing * @param services Used to online/offline regions. * @param merged the merged region * @throws IOException If thrown, transaction failed. Call @@ -569,7 +579,7 @@ public class RegionMergeTransaction { if (services != null) { try { - if (useZKForAssignment) { + if (useCoordinationForAssignment) { services.postOpenDeployTasks(merged, server.getCatalogTracker()); } else if (!services.reportRegionTransition(TransitionCode.MERGED, mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { @@ -585,134 +595,6 @@ public class RegionMergeTransaction { } /** - * Finish off merge transaction, transition the zknode - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @throws IOException If thrown, transaction failed. Call - * {@link #rollback(Server, RegionServerServices)} - */ - void transitionZKNode(final Server server, final RegionServerServices services, - HRegion mergedRegion) throws IOException { - if (useZKAndZKIsSet(server)) { - // Tell master about merge by updating zk. If we fail, abort. - try { - this.znodeVersion = transitionMergingNode(server.getZooKeeper(), - this.mergedRegionInfo, region_a.getRegionInfo(), - region_b.getRegionInfo(), server.getServerName(), this.znodeVersion, - RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED); - - long startTime = EnvironmentEdgeManager.currentTimeMillis(); - int spins = 0; - // Now wait for the master to process the merge. We know it's done - // when the znode is deleted. The reason we keep tickling the znode is - // that it's possible for the master to miss an event. - do { - if (spins % 10 == 0) { - LOG.debug("Still waiting on the master to process the merge for " - + this.mergedRegionInfo.getEncodedName() + ", waited " - + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms"); - } - Thread.sleep(100); - // When this returns -1 it means the znode doesn't exist - this.znodeVersion = transitionMergingNode(server.getZooKeeper(), - this.mergedRegionInfo, region_a.getRegionInfo(), - region_b.getRegionInfo(), server.getServerName(), this.znodeVersion, - RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED); - spins++; - } while (this.znodeVersion != -1 && !server.isStopped() - && !services.isStopping()); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed telling master about merge " - + mergedRegionInfo.getEncodedName(), e); - } - } - - if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); - } - - // Leaving here, the mergedir with its dross will be in place but since the - // merge was successful, just leave it; it'll be cleaned when region_a is - // cleaned up by CatalogJanitor on master - } - - /** - * Wait for the merging node to be transitioned from pending_merge - * to merging by master. That's how we are sure master has processed - * the event and is good with us to move on. If we don't get any update, - * we periodically transition the node so that master gets the callback. - * If the node is removed or is not in pending_merge state any more, - * we abort the merge. - */ - private int getZKNode(final Server server, - final RegionServerServices services) throws IOException { - // Wait for the master to process the pending_merge. - try { - int spins = 0; - Stat stat = new Stat(); - ZooKeeperWatcher zkw = server.getZooKeeper(); - ServerName expectedServer = server.getServerName(); - String node = mergedRegionInfo.getEncodedName(); - while (!(server.isStopped() || services.isStopping())) { - if (spins % 5 == 0) { - LOG.debug("Still waiting for master to process " - + "the pending_merge for " + node); - transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(), - region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE, - RS_ZK_REQUEST_REGION_MERGE); - } - Thread.sleep(100); - spins++; - byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat); - if (data == null) { - throw new IOException("Data is null, merging node " - + node + " no longer exists"); - } - RegionTransition rt = RegionTransition.parseFrom(data); - EventType et = rt.getEventType(); - if (et == RS_ZK_REGION_MERGING) { - ServerName serverName = rt.getServerName(); - if (!serverName.equals(expectedServer)) { - throw new IOException("Merging node " + node + " is for " - + serverName + ", not us " + expectedServer); - } - byte [] payloadOfMerging = rt.getPayload(); - List mergingRegions = HRegionInfo.parseDelimitedFrom( - payloadOfMerging, 0, payloadOfMerging.length); - assert mergingRegions.size() == 3; - HRegionInfo a = mergingRegions.get(1); - HRegionInfo b = mergingRegions.get(2); - HRegionInfo hri_a = region_a.getRegionInfo(); - HRegionInfo hri_b = region_b.getRegionInfo(); - if (!(hri_a.equals(a) && hri_b.equals(b))) { - throw new IOException("Merging node " + node + " is for " + a + ", " - + b + ", not expected regions: " + hri_a + ", " + hri_b); - } - // Master has processed it. - return stat.getVersion(); - } - if (et != RS_ZK_REQUEST_REGION_MERGE) { - throw new IOException("Merging node " + node - + " moved out of merging to " + et); - } - } - // Server is stopping/stopped - throw new IOException("Server is " - + (services.isStopping() ? "stopping" : "stopped")); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed getting MERGING znode on " - + mergedRegionInfo.getRegionNameAsString(), e); - } - } - - /** * Create reference file(s) of merging regions under the region_a merges dir * @param hstoreFilesOfRegionA * @param hstoreFilesOfRegionB @@ -769,14 +651,15 @@ public class RegionMergeTransaction { JournalEntry je = iterator.previous(); switch (je) { - case SET_MERGING_IN_ZK: - if (useZKAndZKIsSet(server)) { - cleanZK(server, this.mergedRegionInfo); - } else if (services != null && !useZKForAssignment + case SET_MERGING: + if (useCoordination(server)) { + ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) + .getRegionMergeCoordination().clean(this.mergedRegionInfo); + } else if (services != null && !useCoordinationForAssignment && !services.reportRegionTransition(TransitionCode.MERGE_REVERTED, mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { return false; - } + } break; case CREATED_MERGE_DIR: @@ -851,100 +734,12 @@ public class RegionMergeTransaction { return this.mergesdir; } - private boolean useZKAndZKIsSet(final Server server) { - return server != null && useZKForAssignment && server.getZooKeeper() != null; + private boolean useCoordination(final Server server) { + return server != null && useCoordinationForAssignment + && server.getCoordinatedStateManager() != null; } - private static void cleanZK(final Server server, final HRegionInfo hri) { - try { - // Only delete if its in expected state; could have been hijacked. - if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - RS_ZK_REQUEST_REGION_MERGE, server.getServerName())) { - ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - RS_ZK_REGION_MERGING, server.getServerName()); - } - } catch (KeeperException.NoNodeException e) { - LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); - } catch (KeeperException e) { - server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e); - } - } - - /** - * Creates a new ephemeral node in the PENDING_MERGE state for the merged region. - * Create it ephemeral in case regionserver dies mid-merge. - * - *

- * Does not transition nodes from other states. If a node already exists for - * this region, a {@link NodeExistsException} will be thrown. - * - * @param zkw zk reference - * @param region region to be created as offline - * @param serverName server event originates from - * @throws KeeperException - * @throws IOException - */ - public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region, - final ServerName serverName, final HRegionInfo a, - final HRegionInfo b) throws KeeperException, IOException { - LOG.debug(zkw.prefix("Creating ephemeral node for " - + region.getEncodedName() + " in PENDING_MERGE state")); - byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b); - RegionTransition rt = RegionTransition.createRegionTransition( - RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload); - String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); - if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { - throw new IOException("Failed create of ephemeral " + node); - } - } - /** - * Transitions an existing ephemeral node for the specified region which is - * currently in the begin state to be in the end state. Master cleans up the - * final MERGE znode when it reads it (or if we crash, zk will clean it up). - * - *

- * Does not transition nodes from other states. If for some reason the node - * could not be transitioned, the method returns -1. If the transition is - * successful, the version of the node after transition is returned. - * - *

- * This method can fail and return false for three different reasons: - *

- * - *

- * Does not set any watches. - * - *

- * This method should only be used by a RegionServer when merging two regions. - * - * @param zkw zk reference - * @param merged region to be transitioned to opened - * @param a merging region A - * @param b merging region B - * @param serverName server event originates from - * @param znodeVersion expected version of data before modification - * @param beginState the expected current state the znode should be - * @param endState the state to be transition to - * @return version of node after transition, -1 if unsuccessful transition - * @throws KeeperException if unexpected zookeeper exception - * @throws IOException - */ - public static int transitionMergingNode(ZooKeeperWatcher zkw, - HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName, - final int znodeVersion, final EventType beginState, - final EventType endState) throws KeeperException, IOException { - byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b); - return ZKAssign.transitionNode(zkw, merged, serverName, - beginState, endState, znodeVersion, payload); - } /** * Checks if the given region has merge qualifier in hbase:meta diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java index de7ce1a..a8fbf54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableStateManager; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.RegionState.State; @@ -389,8 +390,8 @@ public class TestMasterFailover { // Regions of table of merging regions // Cause: Master was down while merging was going on - RegionMergeTransaction.createNodeMerging( - zkw, newRegion, mergingServer, a, b); + ((BaseCoordinatedStateManager) hrs.getCoordinatedStateManager()) + .getRegionMergeCoordination().startRegionMergeTransaction(newRegion, mergingServer, a, b); /* * ZK = NONE