### Eclipse Workspace Patch 1.0
#P apache-trunk
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (working copy)
@@ -117,6 +117,10 @@
+ regionInfo.getRegionNameAsString() + " but "
+ "this table is disabled, triggering close of region");
assignmentManager.unassign(regionInfo);
+ } else if (this.assignmentManager.isRegionInMerging(regionInfo)) {
+ LOG.debug("Opened region " + regionInfo.getRegionNameAsString()
+ + " but " + "this region is in merging, triggering close of region");
+ assignmentManager.unassign(regionInfo);
}
}
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java (revision 0)
@@ -0,0 +1,492 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.MetaScanner;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.CatalogJanitor.SplitParentFirstComparator;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+/**
+ * Executes region merge as a "transaction". Call {@link #prepare()} to check
+ * whether the transaction is available, {@link #offlineRegion()} to offline
+ * merging regions, {@link #executeMerging()} to merge files on fs, {@link
+ * #completeMerging(} and {@link #cancelMerging(} to remove useless data.
+ *
+ *
+ * This class is not thread safe. Caller needs to ensure merge is run by one
+ * thread only.
+ */
+public class MergeTransaction {
+ private static final Log LOG = LogFactory.getLog(MergeTransaction.class);
+
+ private final HTableDescriptor htd;
+ private final HRegionInfo regionA;
+ private final HRegionInfo regionB;
+ // Region C merged from regionA and regionB
+ private HRegionInfo mergedRegion;
+ private final MergeTransactionData transactionData;
+ private final RegionMergeManager mergeManager;
+
+ // Which state does this merge transaction start from
+ private final JournalState initialState;
+ // Current state of this merge transaction
+ protected JournalState currentState;
+
+ private boolean successful = false;
+
+ private final Configuration conf;
+ private final Path rootDir;
+ private final FileSystem fs;
+
+ private final MonitoredTask status;
+
+ /**
+ * State of the transaction journal. Each enum is a state in the merge
+ * transaction. Used to redo after master restart
+ */
+ enum JournalState {
+ /**
+ * Initial state when requesting a merge
+ */
+ CREATE_MERGE,
+ /**
+ * Enter the offline region state after {@link #prepare()}
+ */
+ OFFLINE_REGION,
+ /**
+ * Enter the state executing merge after {@link #offlineRegion()}
+ */
+ EXECUTE_MERGING,
+ /**
+ * Enter the state completing merging after region merged
+ */
+ COMPLETE_MERGING,
+ /**
+ * Enter the state canceling merging if merge transaction is unavailable
+ */
+ CANCEL_MERGING,
+ /**
+ * State representing transaction being over
+ */
+ END
+ }
+
+ public MergeTransaction(final MasterServices masterServices,
+ final RegionMergeManager mergeManager,
+ final MergeTransactionData transactionData, final HRegionInfo mergeA,
+ final HRegionInfo mergeB, final JournalState initialState,
+ final MonitoredTask status) throws FileNotFoundException, IOException {
+ this.conf = masterServices.getConfiguration();
+ this.rootDir = masterServices.getMasterFileSystem().getRootDir();
+ this.fs = masterServices.getMasterFileSystem().getFileSystem();
+ this.mergeManager = mergeManager;
+ this.transactionData = transactionData;
+ this.htd = masterServices.getTableDescriptors().get(
+ transactionData.getTableName());
+ this.regionA = mergeA;
+ this.regionB = mergeB;
+ this.initialState = initialState;
+ this.currentState = initialState;
+ this.status = status;
+ }
+
+ /**
+ * Does checks for merge inputs.
+ *
+ * Set state to OFFLINE_REGION if the merge is available else set the state to
+ * CANCEL_MERGING if it is not(e.g. merge region is a parent or has
+ * reference).
+ * @throws IOException
+ * @throws KeeperException
+ * @throws NoNodeException
+ */
+ void prepare() throws IOException, NoNodeException, KeeperException {
+ status.setStatus("Preparing...");
+ boolean available = true;
+ if (regionA.isSplit() || regionA.isOffline()) {
+ available = false;
+ LOG.debug("Region " + regionA.getRegionNameAsString()
+ + " is split or offline");
+ } else if (regionB.isSplit() || regionB.isOffline()) {
+ available = false;
+ LOG.debug("Region " + regionB.getRegionNameAsString()
+ + " is split or offline");
+ } else if (containsReferenceFile(regionA)) {
+ available = false;
+ LOG.debug("Region " + regionA.getRegionNameAsString()
+ + " has reference , cancel the merge");
+ } else if (containsReferenceFile(regionB)) {
+ available = false;
+ LOG.debug("Region " + regionB.getRegionNameAsString()
+ + " has reference , cancel the merge");
+ } else if (!ensureNoParentRegion()) {
+ available = false;
+ LOG.debug("Exist parent region and can't be cleared, cancel the merge");
+ }
+ if (available) {
+ this.currentState = JournalState.OFFLINE_REGION;
+ } else {
+ this.currentState = JournalState.CANCEL_MERGING;
+ }
+ setJournalStateOnZK();
+ }
+
+ /**
+ * Run the transaction according to the current state
+ * @param masterServices
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void execute(MasterServices masterServices) throws IOException,
+ KeeperException, InterruptedException {
+ while (!masterServices.isStopped()) {
+ if (this.currentState.equals(JournalState.END)) {
+ break;
+ }
+ switch (this.currentState) {
+ case CREATE_MERGE:
+ prepare();
+ break;
+ case OFFLINE_REGION:
+ offlineRegion(masterServices);
+ break;
+ case EXECUTE_MERGING:
+ executeMerging(masterServices);
+ break;
+ case COMPLETE_MERGING:
+ completeMerging(masterServices);
+ break;
+ case CANCEL_MERGING:
+ cancelMerging(masterServices);
+ break;
+ default:
+ throw new RuntimeException("Unhandled merge state: " + currentState);
+ }
+ }
+ }
+
+ /**
+ * Offline the two merging regions
+ * @param masterServices
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ void offlineRegion(MasterServices masterServices) throws KeeperException,
+ InterruptedException {
+ status.setStatus("Offline region...");
+ masterServices.getAssignmentManager().unassign(this.regionA);
+ masterServices.getAssignmentManager().unassign(this.regionB);
+ mergeManager.waitUntilRegionsOffline(this.regionA, this.regionB);
+ if (mergeManager.isRegionSplit(this.regionA)
+ || mergeManager.isRegionSplit(this.regionB)) {
+ LOG.info("Canceling merge because region "
+ + (mergeManager.isRegionSplit(this.regionA) ? regionA
+ .getEncodedName() : regionB.getEncodedName()) + " has been split");
+ this.currentState = JournalState.CANCEL_MERGING;
+ } else {
+ this.currentState = JournalState.EXECUTE_MERGING;
+ }
+ setJournalStateOnZK();
+ }
+
+ /**
+ * Merging the files in HDFS, add merged region to META and assign it
+ * @param masterServices
+ * @throws NoNodeException
+ * @throws KeeperException
+ * @throws IOException
+ */
+ void executeMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, IOException {
+ status.setStatus("Merging regions in fs...");
+ this.mergedRegion = HRegion.generateFixedRegionByMerge(htd, regionA,
+ regionB);
+ Pair mergedRegionInMeta = null;
+ boolean isMergedRegionSplit = false;
+ if (this.initialState == JournalState.EXECUTE_MERGING) {
+ // It is a redo from this step
+ mergedRegionInMeta = MetaReader.getRegion(
+ masterServices.getCatalogTracker(), mergedRegion.getRegionName());
+ if (mergedRegionInMeta == null) {
+ isMergedRegionSplit = checkRegionSplit(
+ masterServices.getCatalogTracker(), mergedRegion);
+ LOG.debug("Redo merging " + transactionData + "from " + initialState
+ + "and check whether merged region"
+ + mergedRegion.getRegionNameAsString() + " is split ="
+ + isMergedRegionSplit);
+ }
+ }
+ if (!isMergedRegionSplit && mergedRegionInMeta == null) {
+ this.mergedRegion = HRegion.merge(fs, rootDir, htd, regionA, regionB);
+ MetaEditor.addRegionToMeta(masterServices.getCatalogTracker(),
+ this.mergedRegion);
+ }
+ status.setStatus("Assigning merged region "
+ + mergedRegion.getRegionNameAsString());
+ if (!isMergedRegionSplit
+ && (mergedRegionInMeta == null || mergedRegionInMeta.getSecond() == null)) {
+ if (mergeManager.isRegionOffline(mergedRegion)) {
+ masterServices.getAssignmentManager().assign(this.mergedRegion, true);
+ }
+ }
+ this.currentState = JournalState.COMPLETE_MERGING;
+ setJournalStateOnZK();
+ }
+
+ /**
+ * Delete merging regions from META after completing the merge successfully
+ * @param masterServices
+ * @throws KeeperException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void completeMerging(MasterServices masterServices) throws KeeperException,
+ IOException, InterruptedException {
+ status.setStatus("Completing merging... ");
+ // Remove regions from .META.
+ if (this.regionA != null) {
+ MetaEditor.deleteRegion(masterServices.getCatalogTracker(), this.regionA);
+ }
+ if (this.regionB != null) {
+ MetaEditor.deleteRegion(masterServices.getCatalogTracker(), this.regionB);
+ }
+ this.mergeManager.waitUntilNoDeadServerInProcess(masterServices);
+ this.mergeManager.finishMergeTransaction(transactionData, true);
+ this.currentState = JournalState.END;
+ this.successful = true;
+ }
+
+ /**
+ * Merge is unavailable, cancel it; Assign the merging region if it is offline
+ * because of merging
+ * @param masterServices
+ * @throws NoNodeException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ void cancelMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, InterruptedException {
+ status.setStatus("Canceling merging... ");
+ this.mergeManager.assignMergingRegionIfOffline(this.regionA);
+ this.mergeManager.assignMergingRegionIfOffline(this.regionB);
+ this.mergeManager.finishMergeTransaction(transactionData, true);
+ this.currentState = JournalState.END;
+ }
+
+ protected void setJournalStateOnZK() throws NoNodeException,
+ KeeperException {
+ mergeManager.setJournalStateOnZK(transactionData.getTransactionName(),
+ this.currentState);
+ }
+
+ /**
+ * Check whether region has Reference file
+ * @param region
+ * @return true if region has reference file
+ * @throws IOException
+ */
+ private boolean containsReferenceFile(final HRegionInfo region)
+ throws IOException {
+ Path tabledir = new Path(rootDir, region.getTableNameAsString());
+ boolean hasReference = false;
+ for (HColumnDescriptor family : htd.getFamilies()) {
+ Path p = HStore.getStoreHomedir(tabledir, region.getEncodedName(),
+ family.getName());
+ if (!fs.exists(p)) continue;
+ // Look for reference files. Call listStatus with anonymous instance of
+ // PathFilter.
+ FileStatus[] ps = FSUtils.listStatus(fs, p, new PathFilter() {
+ public boolean accept(Path path) {
+ return StoreFile.isReference(path);
+ }
+ });
+
+ if (ps != null && ps.length > 0) {
+ hasReference = true;
+ break;
+ }
+ }
+ return hasReference;
+ }
+
+ /**
+ * Between offline region and start merging, make sure there is no parent
+ * region of the two regions in .META. If exists, fixing up daughters would
+ * cause daughter regions(we have merged one) online again when we restart
+ * master, so we clear the parent region in this function to prevent the above
+ * case
+ * @return true if there is no parent region of merging regions
+ * @throws IOException
+ */
+ private boolean ensureNoParentRegion() throws IOException {
+ Map splitParents = mergeManager
+ .getSplitParents(regionA.getTableName());
+ for (Map.Entry splitParent : splitParents.entrySet()) {
+ // check whether its range contains merging region
+ if (doRegionsOverlap(regionA, splitParent.getKey())
+ || doRegionsOverlap(regionB, splitParent.getKey())) {
+ LOG.warn("Region " + regionA.getEncodedName() + " or region "
+ + regionB.getEncodedName() + " has parent region "
+ + splitParent.getKey().getRegionNameAsString()
+ + ", try to clean it");
+ if (!this.mergeManager.cleanParent(splitParent.getKey(),
+ splitParent.getValue())) {
+ LOG.warn("Failed cleaning parent region "
+ + splitParent.getKey().getRegionNameAsString()
+ + ", canceling merge");
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Check whether the region is split from the META
+ * @param catalogTracker
+ * @param checkRegion
+ * @param result
+ * @return true if region is split
+ * @throws IOException
+ */
+ static boolean checkRegionSplit(final CatalogTracker catalogTracker,
+ final HRegionInfo checkRegion) throws IOException {
+ final AtomicBoolean isSplit = new AtomicBoolean(false);
+ byte[] startRow = checkRegion.getRegionName();
+ final byte[] startKey = checkRegion.getStartKey();
+ final byte[] endKey = checkRegion.getEndKey();
+ final byte[] tableName = checkRegion.getTableName();
+ MetaReader.fullScan(catalogTracker, new MetaReader.Visitor() {
+ @Override
+ public boolean visit(Result r) throws IOException {
+ if (r == null || r.isEmpty()) return true;
+ HRegionInfo info = HRegionInfo.getHRegionInfo(r);
+ if (info == null) return true; // Keep scanning
+ if (Bytes.compareTo(info.getTableName(), tableName) > 0) return false;// other table,stop scanning
+ if (Bytes.compareTo(info.getTableName(), tableName) == 0) {
+ if (Bytes.compareTo(info.getStartKey(), startKey) == 0) {
+ if (Bytes.compareTo(info.getEndKey(), endKey) == 0) {
+ // find itself,keep scanning
+ return true;
+ }
+ isSplit.set(true);
+ return false;
+ } else if (Bytes.compareTo(info.getStartKey(), startKey) > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }, startRow);
+ return isSplit.get();
+ }
+
+ public boolean isSuccessful() {
+ return this.successful;
+ }
+
+ public HRegionInfo getMergedRegion() {
+ return this.mergedRegion;
+ }
+
+ public JournalState getCurrentState() {
+ return this.currentState;
+ }
+
+ /**
+ * Check whether there is overlap between region A and region B
+ * @param regionA
+ * @param regionB
+ * @return true if regions' key ranges overlap
+ */
+ private static boolean doRegionsOverlap(HRegionInfo regionA,
+ HRegionInfo regionB) {
+ // if A's startKey < B's startkey, swap them
+ if (Bytes.compareTo(regionA.getStartKey(), regionB.getStartKey()) < 0) {
+ return doRegionsOverlap(regionB, regionA);
+ } else {
+ if (Bytes.compareTo(regionA.getStartKey(), regionB.getEndKey()) < 0
+ || Bytes.equals(regionB.getEndKey(), HConstants.EMPTY_END_ROW)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Get the bytes for a specified JournalState
+ * @param state
+ * @return the bytes of given journal state
+ */
+ static byte[] getBytesFromJournalState(JournalState state) {
+ return Bytes.toBytes(state.toString());
+
+ }
+
+ /**
+ * Get an JournalState from bytes. Throws a {@link RuntimeException} if the
+ * data represents unknown bytes
+ * @param data
+ * @return the journal state as the given bytes
+ */
+ static JournalState parseJournalStateFromBytes(byte[] data) {
+ String stateString = Bytes.toString(data);
+ for (JournalState state : JournalState.values()) {
+ if (state.toString().equals(stateString))
+ return state;
+ }
+ throw new RuntimeException("Unknown JournalState " + stateString);
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy)
@@ -102,6 +102,8 @@
protected final Server server;
private ServerManager serverManager;
+
+ private RegionMergeManager mergeManager;
private CatalogTracker catalogTracker;
@@ -1024,6 +1026,11 @@
LOG.info("Opened region " + regionNameStr
+ "but this table is disabled, triggering close of region");
unassign(regionInfo);
+ } else if (isRegionInMerging(regionInfo)) {
+ LOG.debug("Opened region "
+ + regionInfo.getRegionNameAsString() + " but "
+ + "this region is in merging, triggering close of region");
+ unassign(regionInfo);
}
}
}
@@ -2163,6 +2170,8 @@
Map allRegions = MetaReader.fullScan(
catalogTracker, disabledOrDisablingOrEnabling, true);
if (allRegions == null || allRegions.isEmpty()) return;
+ // Remove the merging regions
+ removeMergingRegions(allRegions);
// Determine what type of assignment to do on startup
boolean retainAssignment = server.getConfiguration().
@@ -2715,6 +2724,9 @@
public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
final HRegionInfo a, final HRegionInfo b) {
regionOffline(parent);
+ if (this.mergeManager != null) {
+ this.mergeManager.notifyRegionSplit(parent);
+ }
regionOnline(a, sn);
regionOnline(b, sn);
@@ -2796,4 +2808,90 @@
}
return true;
}
+
+ void setMergeManager(RegionMergeManager mergeManager) {
+ assert this.mergeManager == null;
+ this.mergeManager = mergeManager;
+ }
+
+ public RegionMergeManager getMergeManager() {
+ return this.mergeManager;
+ }
+
+ public boolean isRegionInMerging(HRegionInfo regionInfo) {
+ return this.mergeManager != null
+ && this.mergeManager.isRegionInMerging(regionInfo);
+ }
+
+ /**
+ * Offline the specified region which is merging now
+ * @param regionInfo
+ */
+ public void offlineMergingRegion(HRegionInfo regionInfo) {
+ // Region is merging so should not be reassigned, just delete the CLOSED
+ // node
+ LOG.debug("Region is in merging so deleting ZK node and removing from "
+ + "regions in transition, skipping assignment of region "
+ + regionInfo.getRegionNameAsString());
+ try {
+ if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
+ // Could also be in OFFLINE mode
+ ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
+ }
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.debug("Tried to delete closed node for " + regionInfo + " but it "
+ + "does not exist so just offlining");
+ } catch (KeeperException e) {
+ this.server.abort("Error deleting CLOSED node in ZK for region "
+ + regionInfo.getEncodedName(), e);
+ }
+ regionOffline(regionInfo);
+ if (this.mergeManager != null) {
+ this.mergeManager.notifyRegionOffline(regionInfo);
+ }
+ }
+
+ /**
+ * Remove the merging regions from the specified map of regions
+ * @param regions
+ */
+ private void removeMergingRegions(Map regions) {
+ if (this.mergeManager != null && this.mergeManager.isAnyRegionInMerging()) {
+ Iterator iterator = regions.keySet().iterator();
+ while (iterator.hasNext()) {
+ if (this.mergeManager.isRegionInMerging(iterator.next())) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the merging region for the specified list of regions
+ * @param regions
+ * @return list of region which are merging
+ */
+ public List getMergingRegions(List regions) {
+ List regionsInMerging = new ArrayList();
+ if (this.mergeManager != null && this.mergeManager.isAnyRegionInMerging()) {
+ for (HRegionInfo region : regions) {
+ if (this.mergeManager.isRegionInMerging(region)) {
+ regionsInMerging.add(region);
+ }
+ }
+ }
+ return regionsInMerging;
+ }
+
+ /**
+ * Get the merging region num for the specified table
+ * @param tableName
+ * @return number of merging regions of given table
+ */
+ public int getNumOfMergingRegionsForTable(String tableName) {
+ if (this.mergeManager != null) {
+ return mergeManager.getNumOfMergingRegionsForTable(tableName);
+ }
+ return 0;
+ }
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java (revision 0)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java (revision 0)
@@ -0,0 +1,365 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Test the {@link MergeTransaction} class against two merging regions.
+ */
+@Category(MediumTests.class)
+public class TestMergeTransaction {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final Log LOG = LogFactory.getLog(TestMergeTransaction.class);
+ private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+ private static byte[] ROW = Bytes.toBytes("testRow");
+ private static final int INITIAL_REGION_NUM = 4;
+ private static final int ROWSIZE = 20;
+ private static final int rowSeperator1 = 5;
+ private static final int rowSeperator2 = 10;
+ private static final int rowSeperator3 = 15;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ // Throw exception in certain step, simulating redo
+ private static int throwExceptionStep = 0;
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ TEST_UTIL.getConfiguration().setClass(
+ "hbase.master.merge.transaction.impl", TestingMergeTransaction.class,
+ MergeTransaction.class);
+ TEST_UTIL.getConfiguration().setInt("hbase.merge.number.tries", 1);
+ // Start a cluster of two regionservers.
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void afterAllTests() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testWholesomeMerge() throws Exception {
+ LOG.info("Starting testWholesomeMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testWholesomeMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Merge 2nd and 3th region
+ mergeAndVerify(master, tablename, 1, 2, INITIAL_REGION_NUM - 1);
+
+ // Merge 1st and 2nd region
+ mergeAndVerify(master, tablename, 0, 1, INITIAL_REGION_NUM - 2);
+
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenOfflineRegion() throws Exception {
+ LOG.info("Starting testRedoMergeWhenOfflineRegion");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenOfflineRegion");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when offline region
+ throwExceptionStep = 1;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ throwExceptionStep = 5;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ throwExceptionStep = 0;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenExecuteMerge() throws Exception {
+ LOG.info("Starting testRedoMergeWhenExecuteMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenExecuteMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when execute merge
+ throwExceptionStep = 2;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ throwExceptionStep = 5;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ throwExceptionStep = 0;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenCompleteMerge() throws Exception {
+ LOG.info("Starting testRedoMergeWhenCompleteMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenCompleteMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when complete merge
+ throwExceptionStep = 3;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+ throwExceptionStep = 0;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenCancelMerge() throws Exception {
+ LOG.info("Starting testRedoMergeWhenCancelMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenCancelMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when complete merge
+ throwExceptionStep = 4;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+ throwExceptionStep = 0;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM);
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ /**
+ * An MergeTransaction instance used in this test.
+ */
+ public static class TestingMergeTransaction extends MergeTransaction {
+
+ public TestingMergeTransaction(MasterServices masterServices,
+ RegionMergeManager mergeManager, MergeTransactionData transactionData,
+ HRegionInfo mergeA, HRegionInfo mergeB, JournalState initialState,
+ MonitoredTask status) throws FileNotFoundException, IOException {
+ super(masterServices, mergeManager, transactionData, mergeA, mergeB,
+ initialState, status);
+ }
+
+ @Override
+ void offlineRegion(MasterServices masterServices) throws KeeperException, InterruptedException {
+ throwExceptionIfTest(1);
+ super.offlineRegion(masterServices);
+ }
+
+ @Override
+ void executeMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, IOException {
+ throwExceptionIfTest(2);
+ super.executeMerging(masterServices);
+ }
+
+ @Override
+ void completeMerging(MasterServices masterServices) throws KeeperException,
+ IOException, InterruptedException {
+ throwExceptionIfTest(3);
+ super.completeMerging(masterServices);
+ }
+
+ @Override
+ void cancelMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, InterruptedException {
+ throwExceptionIfTest(4);
+ super.cancelMerging(masterServices);
+ }
+
+ @Override
+ void prepare() throws IOException, NoNodeException, KeeperException {
+ super.prepare();
+ if (throwExceptionStep == 4) {
+ super.currentState = JournalState.CANCEL_MERGING;
+ super.setJournalStateOnZK();
+ }
+ }
+
+ @Override
+ protected void setJournalStateOnZK() throws NoNodeException,
+ KeeperException {
+ throwExceptionIfTest(5);
+ super.setJournalStateOnZK();
+ }
+
+ void throwExceptionIfTest(int step) {
+ if(throwExceptionStep==step){
+ throw new RuntimeException("Test");
+ }
+ }
+
+ }
+
+ private HTable createTableAndLoadData(HMaster master, byte[] tablename)
+ throws Exception {
+ HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME);
+ TEST_UTIL.createMultiRegions(TEST_UTIL.getConfiguration(), table, FAMILYNAME,
+ new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],ROWS[rowSeperator2], ROWS[rowSeperator3] });
+ loadData(table);
+ verifyRowCount(table, ROWSIZE);
+
+ // sleep here is an ugly hack to allow region transitions to finish
+ long timeout = System.currentTimeMillis() + (15 * 1000);
+ List> tableRegions;
+ while (System.currentTimeMillis() < timeout) {
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ if (tableRegions.size() == 4) break;
+ Thread.sleep(250);
+ }
+
+
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
+ assertEquals(4, tableRegions.size());
+
+ return table;
+ }
+
+ private void loadData(HTable table) throws IOException {
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.add(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ }
+
+ private void verifyRowCount(HTable table, int expectedRegionNum)
+ throws IOException {
+ ResultScanner scanner = table.getScanner(new Scan());
+ int rowCount = 0;
+ while (scanner.next() != null) {
+ rowCount++;
+ }
+ assertEquals(expectedRegionNum, rowCount);
+ scanner.close();
+ }
+
+
+ private void mergeAndVerify(HMaster master, byte[] tablename, int regionAnum,
+ int regionBnum, int expectedRegionNum) throws Exception {
+ requestMergeRegion(master, tablename, regionAnum, regionBnum);
+ waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
+
+ }
+
+ private void requestMergeRegion(HMaster master, byte[] tablename,
+ int regionAnum, int regionBnum) throws Exception {
+ List> tableRegions = MetaReader
+ .getTableRegionsAndLocations(master.getCatalogTracker(),
+ Bytes.toString(tablename));
+ RegionMergeManager.createMergeRequest(master.getZooKeeper(),
+ Bytes.toString(tablename), tableRegions.get(regionAnum).getFirst()
+ .getEncodedName(), tableRegions.get(regionBnum).getFirst()
+ .getEncodedName(), false);
+ }
+
+ private void waitAndVerifyRegionNum(HMaster master, byte[] tablename,
+ int expectedRegionNum) throws Exception {
+ List> tableRegions;
+ long timeout = System.currentTimeMillis() + (15 * 1000);
+ while (System.currentTimeMillis() < timeout) {
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ if (tableRegions.size() == expectedRegionNum)break;
+ Thread.sleep(250);
+ }
+
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegions));
+ assertEquals(expectedRegionNum, tableRegions.size());
+ }
+
+ private void waitAndVerifyResubmitQueueSize(HMaster master, int expectedSize)
+ throws Exception {
+ int queueSize;
+ long timeout = System.currentTimeMillis() + (15 * 1000);
+ while (System.currentTimeMillis() < timeout) {
+ queueSize = master.getAssignmentManager().getMergeManager().resubmitQueusSize();
+ if (queueSize == expectedSize) break;
+ Thread.sleep(250);
+ }
+
+ queueSize = master.getAssignmentManager().getMergeManager().resubmitQueusSize();
+ assertEquals(expectedSize, queueSize);
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+ }
+ return ret;
+ }
+
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (working copy)
@@ -393,6 +393,11 @@
+ " is disabled. Hence not assigning region" + hri.getEncodedName());
return false;
}
+ if (assignmentManager.isRegionInMerging(hri)) {
+ LOG.info("Region " + hri.getRegionNameAsString()
+ + " is merging. Hence not assigning it");
+ return false;
+ }
return true;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -269,6 +269,9 @@
// Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker;
+ // manager of region merge transactions
+ private RegionMergeManager regionMergeManager;
+
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
// operations/debugging.
@@ -493,6 +496,7 @@
if (this.catalogTracker != null) this.catalogTracker.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
+ if (this.regionMergeManager != null) this.regionMergeManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
this.zooKeeper.close();
}
@@ -541,6 +545,9 @@
this.catalogTracker, this.balancer, this.executorService, this.metricsMaster);
zooKeeper.registerListenerFirst(assignmentManager);
+ this.regionMergeManager = new RegionMergeManager(this, this.zooKeeper);
+ this.regionMergeManager.start();
+
this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
this.serverManager);
this.regionServerTracker.start();
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy)
@@ -137,6 +137,8 @@
.getTableRegionsAndLocations(this.ct, tableName, true);
int countOfRegionsInTable = tableRegionsAndLocations.size();
List regions = regionsToAssignWithServerName(tableRegionsAndLocations);
+ List regionsInMerging = assignmentManager.getMergingRegions(regions);
+ regions.removeAll(regionsInMerging);
int regionsCount = regions.size();
if (regionsCount == 0) {
done = true;
@@ -266,7 +268,8 @@
}
private boolean isDone(final List regions) {
- return regions != null && regions.size() >= this.countOfRegionsInTable;
+ return regions != null && (regions.size() + assignmentManager
+ .getNumOfMergingRegionsForTable(tableNameStr)) >= this.countOfRegionsInTable;
}
}
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0)
@@ -0,0 +1,237 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.master.RegionMergeManager;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility that can merge any two regions in the same table: adjacent,
+ * overlapping or disjoint. Different from {@link Merge}, it needn't shutdown
+ * cluster or disable table;
+ *
+ */
+public class OnlineMerge extends Configured implements Tool {
+ static final Log LOG = LogFactory.getLog(OnlineMerge.class);
+
+ private String tableName; // Name of table
+ private String region1EncodedName; // EncodedName of region 1
+ private String region2EncodedName; // EncodedName of region 1
+ private HRegionInfo region1Info; // Info of region 1
+ private HRegionInfo region2Info; // Info of region 1
+ private HRegionInfo mergedRegionInfo; // Info of merged region
+ private HTableDescriptor htd;
+ private boolean isForce = false;
+ private boolean sync = true;
+ private CatalogTracker catalogTracker;
+
+ /** default constructor */
+ public OnlineMerge() {
+ super();
+ }
+
+ /**
+ * @param conf configuration
+ */
+ public OnlineMerge(Configuration conf) {
+ setConf(conf);
+ }
+
+ private int parseArgs(String[] args) throws IOException {
+ GenericOptionsParser parser = new GenericOptionsParser(getConf(), args);
+
+ String[] remainingArgs = parser.getRemainingArgs();
+ if (remainingArgs.length != 3) {
+ usage();
+ return -1;
+ }
+ tableName = remainingArgs[0];
+ region1EncodedName = remainingArgs[1];
+ region2EncodedName = remainingArgs[2];
+ int status = 0;
+ if (region1EncodedName.equals(region2EncodedName)) {
+ LOG.error("Can't merge a region with itself");
+ status = -1;
+ }
+ return status;
+ }
+
+ private void usage() {
+ System.err.println("Usage: bin/hbase org.apache.hadoop.hbase.util.OnlineMerge"
+ + " [-force] [-async] [-show] \n");
+ }
+
+ @SuppressWarnings("deprecation")
+ public int run(String[] args) throws Exception {
+ if (parseArgs(args) != 0) {
+ return -1;
+ }
+ HConnection connection = HConnectionManager.getConnection(getConf());
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ try {
+ if (sync) {
+ this.htd = connection.getHTableDescriptor(Bytes.toBytes(tableName));
+ if (htd == null) {
+ System.err.println("Couldn't get table descriptor");
+ return -1;
+ }
+ this.catalogTracker = new CatalogTracker(zkw, getConf(), connection);
+ this.catalogTracker.start();
+ this.initRegionInfoByEncodedName(catalogTracker);
+ if (this.mergedRegionInfo == null) {
+ System.err.println("Couldn't get region info by encoded name");
+ return -1;
+ }
+ }
+ String transactionName = MergeTransactionData.generateTransactionName(
+ tableName, region1EncodedName, region2EncodedName, isForce);
+ System.out.println("Send merge request " + transactionName);
+ RegionMergeManager.createMergeRequest(zkw, tableName, region1EncodedName,
+ region2EncodedName, isForce);
+ if (sync) {
+ String transactionNode = ZKUtil.joinZNode(zkw.mergeZNode,
+ transactionName);
+ System.out.print("Waiting for completion...");
+ while (true) {
+ boolean existed = ZKUtil.watchAndCheckExists(zkw, transactionNode);
+ if (!existed) {
+ break;
+ } else {
+ System.out.print(".");
+ Thread.sleep(1000);
+ }
+ }
+ System.out.println("");
+ Pair mergedRegionLocation = MetaReader
+ .getRegion(catalogTracker, mergedRegionInfo.getRegionName());
+ if (mergedRegionLocation != null) {
+ System.out.println("Complete merging, new region ="
+ + mergedRegionInfo.getRegionNameAsString()
+ + ", assigned on the server " + mergedRegionLocation.getSecond());
+ } else {
+ System.err.println("Merge is cancelled, please see the logs on the master");
+ }
+ }
+ } finally {
+ if (this.catalogTracker != null) {
+ this.catalogTracker.stop();
+ }
+ connection.close();
+ }
+ return 0;
+ }
+
+ private void setForce(boolean force) {
+ this.isForce = force;
+ }
+
+ private void setSync(boolean sync) {
+ this.sync = sync;
+ }
+
+ private void initRegionInfoByEncodedName(CatalogTracker ct)
+ throws IOException {
+ if (tableName != null && (region1Info == null || region2Info == null)) {
+ List tableRegions = MetaReader.getTableRegions(ct,
+ Bytes.toBytes(tableName));
+ for (HRegionInfo regionInfo : tableRegions) {
+ if (regionInfo.getEncodedName().equals(region1EncodedName)) {
+ region1Info = regionInfo;
+ } else if (regionInfo.getEncodedName().equals(region2EncodedName)) {
+ region2Info = regionInfo;
+ }
+ if (region1Info != null && region2Info != null) {
+ this.mergedRegionInfo = HRegion.generateFixedRegionByMerge(this.htd,
+ this.region1Info, this.region2Info);
+ break;
+ }
+ }
+ }
+ }
+
+ static void showMergeRequests() throws Exception {
+ HConnection connection = HConnectionManager.getConnection(HBaseConfiguration.create());
+ try {
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ List mergingRequests = ZKUtil.listChildrenNoWatch(zkw,
+ zkw.mergeZNode);
+ System.out.println("Handling merge requests:");
+ for (String mergeRequest : mergingRequests) {
+ System.out.println(mergeRequest);
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
+ public static void main(String[] args) {
+ int status;
+ OnlineMerge onlineMerge = new OnlineMerge();
+ try {
+ while (true) {
+ if (args.length >= 1 && "-force".equals(args[0])) {
+ String[] newArgs = new String[args.length - 1];
+ for (int i = 1; i < args.length; i++) {
+ newArgs[i - 1] = args[i];
+ }
+ args = newArgs;
+ onlineMerge.setForce(true);
+ } else if (args.length >= 1 && "-async".equals(args[0])) {
+ String[] newArgs = new String[args.length - 1];
+ for (int i = 1; i < args.length; i++) {
+ newArgs[i - 1] = args[i];
+ }
+ args = newArgs;
+ onlineMerge.setSync(false);
+ } else if (args.length >= 1 && "-show".equals(args[0])) {
+ showMergeRequests();
+ System.exit(0);
+ } else {
+ break;
+ }
+ }
+ status = ToolRunner.run(HBaseConfiguration.create(), onlineMerge, args);
+ } catch (Exception e) {
+ LOG.error("exiting due to error", e);
+ status = -1;
+ }
+ System.exit(status);
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java (working copy)
@@ -36,13 +36,17 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
@@ -110,6 +114,20 @@
* an ordered map of split parents.
*/
Pair> getSplitParents() throws IOException {
+ return getSplitParents(null);
+ }
+
+ /**
+ * Scans META and returns a number of scanned rows, and an ordered map of
+ * split parents. if the given table name is null, return split parents of all
+ * tables, else only the specified table
+ * @param tableName null represents all tables
+ * @return pair of scanned rows, and map of split parent regioninfos
+ * @throws IOException
+ */
+ Pair> getSplitParents(final byte[] tableName)
+ throws IOException {
+ final boolean isTableSpecified = (tableName != null && tableName.length != 0);
// TODO: Only works with single .META. region currently. Fix.
final AtomicInteger count = new AtomicInteger(0);
// Keep Map of found split parents. There are candidates for cleanup.
@@ -124,13 +142,21 @@
count.incrementAndGet();
HRegionInfo info = HRegionInfo.getHRegionInfo(r);
if (info == null) return true; // Keep scanning
+ if (isTableSpecified && Bytes.compareTo(info.getTableName(), tableName) > 0) {
+ // Another table, stop scanning
+ return false;
+ }
if (info.isSplitParent()) splitParents.put(info, r);
// Returning true means "keep scanning"
return true;
}
};
- // Run full scan of .META. catalog table passing in our custom visitor
- MetaReader.fullScan(this.server.getCatalogTracker(), visitor);
+
+ byte[] startRow = (!isTableSpecified) ? HConstants.EMPTY_START_ROW
+ : HRegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false);
+ // Run full scan of .META. catalog table passing in our custom visitor with
+ // the start row
+ MetaReader.fullScan(this.server.getCatalogTracker(), visitor, startRow);
return new Pair>(count.get(), splitParents);
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy)
@@ -4223,6 +4223,119 @@
return dstRegion;
}
+ /**
+ * Generate a fixed region info when merging
+ * @param tabledesc
+ * @param a merging region A
+ * @param b merging region B
+ * @return the merged region info
+ * @throws IOException
+ */
+ public static HRegionInfo generateFixedRegionByMerge(
+ final HTableDescriptor tabledesc, final HRegionInfo a, final HRegionInfo b)
+ throws IOException {
+ if (!a.getTableNameAsString().equals(b.getTableNameAsString())) {
+ throw new IllegalArgumentException("Regions do not belong to the same table");
+ }
+ // Presume both are of same region type -- i.e. both user or catalog
+ // table regions. This way can use comparator.
+ final byte[] startKey = (a.getComparator().matchingRows(a.getStartKey(), 0,
+ a.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
+ HConstants.EMPTY_BYTE_ARRAY.length) || b.getComparator().matchingRows(
+ b.getStartKey(), 0, b.getStartKey().length,
+ HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)) ? HConstants.EMPTY_BYTE_ARRAY
+ : (a.getComparator().compareRows(a.getStartKey(), 0,
+ a.getStartKey().length, b.getStartKey(), 0, b.getStartKey().length) <= 0 ? a
+ .getStartKey() : b.getStartKey());
+ final byte[] endKey = (a.getComparator().matchingRows(a.getEndKey(), 0,
+ a.getEndKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
+ HConstants.EMPTY_BYTE_ARRAY.length) || a.getComparator().matchingRows(
+ b.getEndKey(), 0, b.getEndKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
+ HConstants.EMPTY_BYTE_ARRAY.length)) ? HConstants.EMPTY_BYTE_ARRAY : (a
+ .getComparator().compareRows(a.getEndKey(), 0, a.getEndKey().length,
+ b.getEndKey(), 0, b.getEndKey().length) <= 0 ? b.getEndKey() : a
+ .getEndKey());
+
+ HRegionInfo newRegionInfo = new HRegionInfo(tabledesc.getName(), startKey,
+ endKey, false, Math.max(a.getRegionId(), b.getRegionId()) + 1);
+ return newRegionInfo;
+ }
+
+ /**
+ * Merge two regions whether they are adjacent or not.
+ * @param fs
+ * @param rootDir
+ * @param tabledesc
+ * @param a merging region A
+ * @param b merging region B
+ * @return the merged region info
+ * @throws IOException
+ */
+ public static HRegionInfo merge(final FileSystem fs, final Path rootDir,
+ final HTableDescriptor tabledesc, final HRegionInfo a, final HRegionInfo b)
+ throws IOException {
+ HRegionInfo newRegionInfo = generateFixedRegionByMerge(tabledesc, a, b);
+ Path tableDir = HTableDescriptor.getTableDir(rootDir, a.getTableName());
+ String newRegionEncodedName = newRegionInfo.getEncodedName();
+ Path newRegionDir = HRegion.getRegionDir(tableDir, newRegionEncodedName);
+ if (!fs.exists(newRegionDir)) {
+ if (!fs.mkdirs(newRegionDir)) {
+ throw new IOException("Failed making dirs " + newRegionDir);
+ }
+ }
+
+ LOG.info("Starting merge of regions on fs : " + a.getRegionNameAsString()
+ + " and " + b.getRegionNameAsString() + " into new region "
+ + newRegionInfo.getRegionNameAsString() + " with start key <"
+ + Bytes.toStringBinary(newRegionInfo.getStartKey()) + "> and end key <"
+ + Bytes.toStringBinary(newRegionInfo.getEndKey()) + ">");
+
+ // Move HStoreFiles under new region directory
+ Map> byFamily = new TreeMap>(
+ Bytes.BYTES_COMPARATOR);
+ HRegionInfo[] regionInfos = new HRegionInfo[] { a, b };
+ for (HRegionInfo hri : regionInfos) {
+ for (HColumnDescriptor family : tabledesc.getFamilies()) {
+ Path familyDir = HStore.getStoreHomedir(tableDir, hri.getEncodedName(),
+ family.getName());
+ FileStatus files[] = fs.listStatus(familyDir);
+ if (files == null || files.length == 0) continue;
+ List familyFileList = byFamily.get(family.getName());
+ if (familyFileList == null) {
+ familyFileList = new ArrayList();
+ byFamily.put(family.getName(), familyFileList);
+ }
+ for (int i = 0; i < files.length; i++) {
+ if (StoreFile.isReference(files[i].getPath())) {
+ throw new IOException("Reference file still exists:"
+ + files[i].getPath());
+ }
+ familyFileList.add(files[i].getPath());
+ }
+ }
+ }
+
+ for (Map.Entry> es : byFamily.entrySet()) {
+ byte[] colFamily = es.getKey();
+ makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
+ // Because we compacted the source regions we should have no more than two
+ // HStoreFiles per family and there will be no reference store
+ List srcFiles = es.getValue();
+ for (Path hsf : srcFiles) {
+ fs.rename(hsf, StoreFile.getUniqueFile(fs, HStore.getStoreHomedir(
+ tableDir, newRegionInfo.getEncodedName(), colFamily)));
+ }
+ }
+ // Archive out the 'A' region
+ HFileArchiver.archiveRegion(fs, a);
+ // Archive out the 'B' region
+ HFileArchiver.archiveRegion(fs, b);
+
+ LOG.info("Merge completed on the fs. New region is "
+ + newRegionInfo.getRegionNameAsString());
+ return newRegionInfo;
+ }
+
/*
* Fills a map with a vector of store files keyed by column family.
* @param byFamily Map to fill.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (working copy)
@@ -92,6 +92,11 @@
@Override
public void process() {
LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
+ // Check if this region is in merging or not
+ if (this.assignmentManager.isRegionInMerging(this.regionInfo)) {
+ assignmentManager.offlineMergingRegion(regionInfo);
+ return;
+ }
// Check if this table is being disabled or not
if (this.assignmentManager.getZKTable().
isDisablingOrDisabledTable(this.regionInfo.getTableNameAsString())) {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionMergeManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionMergeManager.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionMergeManager.java (revision 0)
@@ -0,0 +1,724 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.MergeTransaction.JournalState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Receive and handle merge request via zk, manage merge transaction and merging
+ * region list, resubmit the failed merges by a period
+ */
+public class RegionMergeManager extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(RegionMergeManager.class);
+
+ private MasterServices master;
+ private final AssignmentManager assignmentManager;
+ private final CatalogJanitor catalogJanitor;
+ private final Configuration conf;
+
+ // A set used to store the regions which are merging; A region in merging
+ // means it's disabled, isn't online anywhere and won't be assigned any more;
+ private final ConcurrentSkipListSet mergingRegions;
+ // Set of region which are already split; Only add to this set if it is in
+ // mergingRegions
+ private final ConcurrentSkipListSet splitRegions;
+
+ // Map of merge transaction in processing where the key is transaction name
+ private final ConcurrentHashMap processingMergeTransaction;
+
+ // Queue of merge transactions which are waiting for resubmit
+ private final BlockingQueue resubmissionQueue;
+
+ /** Resubmit thread schedule pool */
+ private ScheduledExecutorService resubmitSchedulePool;
+ // Running interval of resubmit thread, 60s as default
+ private final int resubmitThreadPeriod;
+ static final int DEFAULT_RESUBMIT_PERIOD = 60;
+
+ // Used for wait signal
+ private final Object present_object = new Object();
+
+ private final int timePerSleep = 10;
+ // 120s
+ private static final int DEFAULT_TIME_OUT = 120000;
+ private final int timeout;
+ /**
+ * If it is the first time we called {@link RegionMergeManager#fetchMergeRequest()},
+ * we should make sure old merge transactions handled first
+ */
+ private boolean isFirstFetchRequest = true;
+
+ // Thread pool used to run merge request
+ private ExecutorService mergesPool;
+ private final int mergeThreads;
+
+ /**
+ * Constructs a new merge manager.
+ * @param master
+ * @param catalogJanitor
+ * @param watcher
+ * @throws KeeperException
+ * @throws IOException
+ */
+ public RegionMergeManager(final MasterServices master, ZooKeeperWatcher watcher)
+ throws KeeperException, IOException {
+ super(watcher);
+ this.master = master;
+ this.conf = master.getConfiguration();
+ this.timeout = conf.getInt("hbase.merge.wait.timeout.millis", DEFAULT_TIME_OUT);
+ this.catalogJanitor = new CatalogJanitor(master, master);
+ this.assignmentManager = master.getAssignmentManager();
+ this.mergingRegions = new ConcurrentSkipListSet();
+ this.splitRegions = new ConcurrentSkipListSet();
+ this.processingMergeTransaction = new ConcurrentHashMap();
+ this.resubmissionQueue = new LinkedBlockingQueue();
+ this.mergeThreads = conf.getInt("hbase.merge.thread.count", 1);
+ this.resubmitThreadPeriod = conf.getInt("hbase.merge.resubmit.period", DEFAULT_RESUBMIT_PERIOD);
+ this.assignmentManager.setMergeManager(this);
+
+ }
+
+ /**
+ * Submit the merge request to the pool
+ * @param transactionData
+ * @param initialState
+ * @throws IOException
+ */
+ private void requestMerge(MergeTransactionData transactionData,
+ JournalState initialState) {
+ try {
+ this.mergesPool.execute(new MergeRequest(this, transactionData,
+ initialState, master));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Merge requested for " + transactionData + ",initialState= "
+ + initialState);
+ }
+ } catch (RejectedExecutionException ree) {
+ LOG.warn("Could not execute merge for " + transactionData, ree);
+ }
+ }
+
+ /**
+ * Handle the merge transaction data from ZK
+ * @param transactionData
+ * @throws KeeperException
+ * @throws IOException
+ */
+ synchronized private void handleMergeTransactionData(
+ MergeTransactionData transactionData) throws KeeperException {
+ String regionA = transactionData.getRegionA();
+ String regionB = transactionData.getRegionB();
+ if (mergingRegions.contains(regionA) || mergingRegions.contains(regionB)) {
+ LOG.warn("Merge request " + transactionData + " try to merge region "
+ + (mergingRegions.contains(regionA) ? regionA : regionB)
+ + "where other transaction contains");
+ finishMergeTransaction(transactionData, false);
+ } else if (regionA.equals(regionB)) {
+ LOG.warn("Error merge request:" + transactionData
+ + ",shouldn't merge the same region");
+ finishMergeTransaction(transactionData, false);
+ } else {
+ String transactionNode = ZKUtil.joinZNode(watcher.mergeZNode,
+ transactionData.getTransactionName());
+ byte[] data = ZKUtil.getData(watcher, transactionNode);
+ JournalState initialState = MergeTransaction
+ .parseJournalStateFromBytes(data);
+ mergingRegions.add(regionA);
+ mergingRegions.add(regionB);
+ requestMerge(transactionData, initialState);
+ }
+ }
+
+ HRegionInfo getRegionInfoByEncodedName(String tableName,
+ String regionEncodeName) throws IOException {
+ RegionState regionState = this.assignmentManager.getRegionStates()
+ .getRegionState(regionEncodeName);
+ if (regionState != null) {
+ return regionState.getRegion();
+ }
+ List tableRegions = MetaReader.getTableRegions(
+ this.master.getCatalogTracker(), Bytes.toBytes(tableName));
+ for (HRegionInfo regionInfo : tableRegions) {
+ if (regionInfo.getEncodedName().equals(regionEncodeName)) {
+ return regionInfo;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Starts the thread pool and the tracking of merge request.
+ *
+ * @throws KeeperException
+ * @throws IOException
+ */
+ public void start() throws KeeperException, IOException {
+ this.mergesPool = Executors.newFixedThreadPool(
+ mergeThreads,
+ new ThreadFactoryBuilder()
+ .setNameFormat("MergeManager-Merge-Thread #%d").setDaemon(true)
+ .build());
+
+ this.resubmitSchedulePool = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("MergeManager-Resubmit-Thread #%d").setDaemon(true)
+ .build());
+ resubmitSchedulePool.scheduleAtFixedRate(new ResubmitThread(this,
+ this.resubmissionQueue), resubmitThreadPeriod, resubmitThreadPeriod,
+ TimeUnit.SECONDS);
+ watcher.registerListener(this);
+ fetchMergeRequest();
+ }
+
+ /**
+ * Shutdown the threadpools
+ */
+ public void stop() {
+ if (mergesPool != null) {
+ this.mergesPool.shutdown();
+ this.mergesPool = null;
+ }
+ if (this.resubmitSchedulePool != null) {
+ this.resubmitSchedulePool.shutdown();
+ this.resubmitSchedulePool = null;
+ }
+ }
+
+ /**
+ * Thread used to resubmit failed merge transaction
+ */
+ private static class ResubmitThread extends Thread {
+ RegionMergeManager mergeManager;
+ BlockingQueue resubmitQueue;
+
+ public ResubmitThread(RegionMergeManager mergeManager,
+ BlockingQueue resubmitQueue) {
+ super("MergeManager.ResubmitThread");
+ this.mergeManager = mergeManager;
+ this.resubmitQueue = resubmitQueue;
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ MergeTransactionData transactionData = null;
+ try {
+ while ((transactionData = resubmitQueue.poll()) != null) {
+ String transactionNode = ZKUtil.joinZNode(
+ mergeManager.watcher.mergeZNode,
+ transactionData.getTransactionName());
+ byte[] data = ZKUtil.getData(mergeManager.watcher, transactionNode);
+ JournalState initialState = MergeTransaction
+ .parseJournalStateFromBytes(data);
+ mergeManager.requestMerge(transactionData, initialState);
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Failed resubmit transaction " + transactionData, e);
+ if (transactionData != null) {
+ mergeManager.addToResubmitQueue(transactionData);
+ }
+ }
+ }
+ }
+
+ /**
+ * Fetch the merge request via zk
+ * @throws KeeperException
+ * @throws IOException
+ */
+ synchronized private void fetchMergeRequest() throws KeeperException {
+ List mergeTransactions = ZKUtil.listChildrenAndWatchForNewChildren(
+ watcher, watcher.mergeZNode);
+ if (isFirstFetchRequest) {
+ isFirstFetchRequest = false;
+ List oldMergeTransactions = new ArrayList();
+ List newMergeTransactions = new ArrayList();
+ for (String transactionName : mergeTransactions) {
+ String transactionNode = ZKUtil.joinZNode(watcher.mergeZNode,
+ transactionName);
+ byte[] data = ZKUtil.getData(watcher, transactionNode);
+ JournalState initialState = MergeTransaction
+ .parseJournalStateFromBytes(data);
+ if (initialState != JournalState.CREATE_MERGE) {
+ oldMergeTransactions.add(transactionName);
+ } else {
+ newMergeTransactions.add(transactionName);
+ }
+ }
+ mergeTransactions.clear();
+ mergeTransactions.addAll(oldMergeTransactions);
+ mergeTransactions.addAll(newMergeTransactions);
+ }
+ for (String transactionName : mergeTransactions) {
+ if (!MergeTransactionData.isValidTransactionName(transactionName)) {
+ LOG.warn("Invalid merge transaction name=" + transactionName
+ + ", clean it");
+ ZKUtil.deleteNodeRecursively(watcher,
+ ZKUtil.joinZNode(watcher.mergeZNode, transactionName));
+ } else if (this.processingMergeTransaction.containsKey(transactionName)) {
+ continue;
+ } else {
+ MergeTransactionData transactionData = new MergeTransactionData(
+ transactionName);
+ this.processingMergeTransaction.put(transactionName, transactionData);
+ handleMergeTransactionData(transactionData);
+ }
+ }
+ }
+
+ /**
+ * Finish the merge transaction: 1.delete the transaction zk node; 2.remove
+ * the region in set if it is called after we have submit the request
+ * @param transactionData
+ * @param successfulSubmission true if it is called by the MergeRequest thread
+ * @throws KeeperException
+ */
+ void finishMergeTransaction(MergeTransactionData transactionData,
+ boolean successfulSubmission) throws KeeperException {
+ ZKUtil.deleteNodeRecursively(watcher,
+ ZKUtil.joinZNode(watcher.mergeZNode,
+ transactionData.getTransactionName()));
+ this.processingMergeTransaction
+ .remove(transactionData.getTransactionName());
+ if (successfulSubmission) {
+ String regionA = transactionData.getRegionA();
+ String regionB = transactionData.getRegionB();
+ this.mergingRegions.remove(regionA);
+ this.mergingRegions.remove(regionB);
+ this.splitRegions.remove(regionA);
+ this.splitRegions.remove(regionB);
+ }
+ }
+
+ /**
+ * Add to the resubmit queue
+ * @param transactionData
+ */
+ void addToResubmitQueue(MergeTransactionData transactionData) {
+ this.resubmissionQueue.add(transactionData);
+ }
+
+ /**
+ * Used in test
+ * @return resubmit queue size
+ */
+ int resubmitQueusSize() {
+ return this.resubmissionQueue.size();
+ }
+
+ /**
+ * Used in test
+ */
+ void triggerResubmissionForTest() {
+ new ResubmitThread(this, this.resubmissionQueue).run();
+ }
+
+ /**
+ * Assign the merging region if it is offline, called when cancelling the
+ * merge
+ * @param regionInfo
+ */
+ void assignMergingRegionIfOffline(HRegionInfo regionInfo) {
+ String tableName = regionInfo.getTableNameAsString();
+ ZKTable zkTable = assignmentManager.getZKTable();
+ if (zkTable.isTablePresent(tableName)
+ && !zkTable.isDisablingOrDisabledTable(tableName)) {
+ if (isRegionOffline(regionInfo) && !isRegionSplit(regionInfo)) {
+ assignmentManager.assign(regionInfo, true);
+ }
+ }
+ this.mergingRegions.remove(regionInfo.getEncodedName());
+ }
+
+ /**
+ * Assign the merged region if it is offline, called after adding merged
+ * region to META
+ * @param regionInfo
+ * @throws InterruptedException
+ */
+ void assignMergedRegionIfOffline(HRegionInfo mergedRegionInfo)
+ throws InterruptedException {
+ String tableName = mergedRegionInfo.getTableNameAsString();
+ ZKTable zkTable = assignmentManager.getZKTable();
+ waitUntilTableNotInEnabling(zkTable, tableName);
+ if (zkTable.isTablePresent(tableName)
+ && !zkTable.isDisablingOrDisabledTable(tableName)) {
+ if (isRegionOffline(mergedRegionInfo)) {
+ assignmentManager.assign(mergedRegionInfo, true);
+ }
+ }
+ }
+
+ /**
+ * Wait until two regions are offline
+ * @param regionA
+ * @param regionB
+ * @throws InterruptedException
+ */
+ void waitUntilRegionsOffline(HRegionInfo regionA, HRegionInfo regionB)
+ throws InterruptedException {
+ long expireTime = EnvironmentEdgeManager.currentTimeMillis() + this.timeout;
+ while (!this.master.isStopped() && EnvironmentEdgeManager.currentTimeMillis() < expireTime) {
+ if (isRegionOffline(regionA) && isRegionOffline(regionB)) {
+ return;
+ } else {
+ synchronized (present_object) {
+ present_object.wait(timePerSleep);
+ }
+ }
+ }
+ throw new RuntimeException("Timeout waiting region offline");
+ }
+
+ /**
+ * Wait until master is initialized
+ * @throws InterruptedException
+ */
+ void waitUntilMasterInitialized() throws InterruptedException {
+ while (!this.master.isStopped()) {
+ if (((HMaster) this.master).isInitialized()) {
+ return;
+ }
+ Thread.sleep(timePerSleep);
+ }
+ throw new RuntimeException("Timeout waiting master initialized");
+ }
+
+ /**
+ * Wait until no dead server in process
+ * @param masterServices
+ * @throws InterruptedException
+ */
+ void waitUntilNoDeadServerInProcess(MasterServices masterServices)
+ throws InterruptedException {
+ while (!this.master.isStopped()) {
+ if (!masterServices.getServerManager().areDeadServersInProgress()) {
+ return;
+ }
+ Thread.sleep(timePerSleep);
+ }
+ throw new RuntimeException("Timeout waiting no deadserver in process");
+ }
+
+ /**
+ * Wait until specified table not in enabling state
+ * @param zkTable
+ * @param tableName
+ * @throws InterruptedException
+ */
+ void waitUntilTableNotInEnabling(ZKTable zkTable, String tableName)
+ throws InterruptedException {
+ long expireTime = EnvironmentEdgeManager.currentTimeMillis() + this.timeout;
+ while (!this.master.isStopped() && EnvironmentEdgeManager.currentTimeMillis() < expireTime) {
+ if (!zkTable.isEnablingTable(tableName)) {
+ return;
+ }
+ Thread.sleep(timePerSleep);
+ }
+ throw new RuntimeException("Timeout waiting table " + tableName + " not in enabling");
+ }
+
+ /**
+ * Check if the merging region is split
+ * @param regionInfo
+ * @return
+ */
+ boolean isRegionSplit(HRegionInfo regionInfo) {
+ if (regionInfo.isSplitParent()) {
+ return true;
+ }
+ if (this.splitRegions.contains(regionInfo.getEncodedName())) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Check if the merging region is offline
+ * @param regionInfo
+ * @return
+ */
+ boolean isRegionOffline(HRegionInfo regionInfo) {
+ if (!this.assignmentManager.getRegionStates().isRegionInTransition(regionInfo)
+ && this.assignmentManager.getRegionStates().getRegionServerOfRegion(regionInfo) == null) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Record the transaction's JournalState on the ZK
+ * @param transactionName
+ * @param state
+ * @throws NoNodeException
+ * @throws KeeperException
+ */
+ void setJournalStateOnZK(String transactionName, JournalState state)
+ throws NoNodeException, KeeperException {
+ String transactionNode = ZKUtil.joinZNode(watcher.mergeZNode,
+ transactionName);
+ ZKUtil.setData(watcher, transactionNode,
+ MergeTransaction.getBytesFromJournalState(state));
+ }
+
+ /**
+ * Clean parent region of merging regions in prepare step
+ * @param parent
+ * @param rowContent
+ * @return
+ * @throws IOException
+ */
+ boolean cleanParent(final HRegionInfo parent, Result rowContent)
+ throws IOException {
+ return this.catalogJanitor.cleanParent(parent, rowContent);
+ }
+
+ /**
+ * Get split parents region for a specified table
+ * @param tableName
+ * @return map of split parent regioninfos
+ */
+ Map getSplitParents(final byte[] tableName)
+ throws IOException {
+ return this.catalogJanitor.getSplitParents(tableName).getSecond();
+ }
+
+ /**
+ * Check whether a region is in merging, if true it shouldn't be assigned
+ * @param regionInfo
+ * @return true if region in the merging
+ */
+ boolean isRegionInMerging(final HRegionInfo regionInfo) {
+ return this.mergingRegions.contains(regionInfo.getEncodedName());
+ }
+
+ /**
+ * Check if any regions are merging
+ * @return true if any regions in merging
+ */
+ boolean isAnyRegionInMerging() {
+ return !this.mergingRegions.isEmpty();
+ }
+
+ public Set getMergingRegions() {
+ Set clone = new HashSet(this.mergingRegions.size());
+ clone.addAll(this.mergingRegions);
+ return clone;
+ }
+
+ /**
+ * Get the merging region num for the specified table
+ * @param tableName
+ * @return number of merging regions of given table
+ */
+ public int getNumOfMergingRegionsForTable(String tableName) {
+ int num = 0;
+ for (MergeTransactionData transactionData : processingMergeTransaction
+ .values()) {
+ if (tableName.equals(transactionData.getTableName())) {
+ num += 2;
+ }
+ }
+ return num;
+ }
+
+ /**
+ * AssignmentManager notifies MergeManager one region is offline
+ * @param regionInfo
+ */
+ void notifyRegionOffline(final HRegionInfo regionInfo) {
+ if (this.mergingRegions.contains(regionInfo.getEncodedName())) {
+ synchronized (present_object) {
+ present_object.notify();
+ }
+ }
+ }
+
+ /**
+ * AssignmentManager notify MergeManager one region is split
+ * @param regionInfo
+ */
+ void notifyRegionSplit(final HRegionInfo regionInfo) {
+ if (this.mergingRegions.contains(regionInfo.getEncodedName())) {
+ this.splitRegions.add(regionInfo.getEncodedName());
+ synchronized (present_object) {
+ present_object.notifyAll();
+ }
+ }
+ }
+
+ // ZooKeeper events
+
+ /**
+ * New transaction node has been created or deleted.
+ *
+ * When this happens we must:
+ *
+ * - Find the new merge request and handle it
+ *
+ */
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (path.equals(watcher.mergeZNode)) {
+ try {
+ LOG.debug("Merge reuest arrived or completed ");
+ fetchMergeRequest();
+ } catch (KeeperException e) {
+ LOG.error("Unexpected ZK exception handling merge requests", e);
+ } catch (Throwable t) {
+ LOG.error("Unhandled exception: " + t.getMessage(), t);
+ }
+ }
+ }
+
+ public static class MergeTransactionData {
+ public static final String SEPARATOR = "#";
+ private final String transactionName;
+ private final String tableName;
+ private final String regionA;
+ private final String regionB;
+ private final boolean isForce;
+
+ MergeTransactionData(String transactionName) {
+ this.transactionName = transactionName;
+ String[] elements = transactionName.split(SEPARATOR);
+ assert elements.length == 4;
+ this.tableName = elements[0];
+ this.regionA = elements[1];
+ this.regionB = elements[2];
+ this.isForce = Boolean.parseBoolean(elements[3]);
+ }
+
+ String getTransactionName() {
+ return this.transactionName;
+ }
+
+ String getTableName() {
+ return this.tableName;
+ }
+
+ String getRegionA() {
+ return this.regionA;
+ }
+
+ String getRegionB() {
+ return this.regionB;
+ }
+
+ boolean isForce() {
+ return this.isForce;
+ }
+
+ @Override
+ public String toString() {
+ return "table=" + tableName + ", regionA=" + regionA + ", regionB="
+ + regionB + ", force=" + isForce;
+ }
+
+ /**
+ * Valid TransactionName=tableName+'#'+AEncodedName+"#"+BEncodedName+"#"+
+ * "true/false"
+ * @param transactionName
+ * @return
+ */
+ public static boolean isValidTransactionName(String transactionName) {
+ String[] elements = transactionName.split(SEPARATOR);
+ if (elements.length != 4) {
+ return false;
+ }
+ if (elements[1].length() != elements[2].length()) {
+ return false;
+ }
+ if (elements[1].equals(elements[2])) {
+ return false;
+ }
+ if (!elements[3].equals("true") && !elements[3].equals("false")) {
+ return false;
+ }
+ return true;
+ }
+
+ public static String generateTransactionName(String tableName,
+ String regionAEncodedName, String regionBEncodedName, boolean force) {
+ return tableName + SEPARATOR + regionAEncodedName + SEPARATOR
+ + regionBEncodedName + SEPARATOR + force;
+ }
+ }
+
+ /**
+ * Create a merge request via zk.Set force true if merge two non-adjacent
+ * regions, else the request will be skipped
+ * @param zkw
+ * @param tableName
+ * @param regionAEncodedName
+ * @param regionBEncodedName
+ * @param force if it is false, only merge two adjacent regions
+ * @throws KeeperException
+ */
+ public static void createMergeRequest(ZooKeeperWatcher zkw, String tableName,
+ String regionAEncodedName, String regionBEncodedName, boolean force)
+ throws KeeperException {
+ String transactionName = MergeTransactionData.generateTransactionName(
+ tableName, regionAEncodedName, regionBEncodedName, force);
+ if (MergeTransactionData.isValidTransactionName(transactionName)) {
+ String transactionNode = ZKUtil
+ .joinZNode(zkw.mergeZNode, transactionName);
+ ZKUtil.createAndNoWatch(zkw, transactionNode,
+ MergeTransaction.getBytesFromJournalState(JournalState.CREATE_MERGE));
+ } else {
+ throw new IllegalArgumentException("Unknown merge parameters, table="
+ + tableName + ", regionA=" + regionAEncodedName + ", regionB="
+ + regionBEncodedName);
+ }
+
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy)
@@ -103,6 +103,8 @@
public String splitLogZNode;
// znode containing the state of the load balancer
public String balancerZNode;
+ // znode used for merge
+ public String mergeZNode;
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE =
@@ -166,6 +168,7 @@
ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode);
ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
+ ZKUtil.createAndFailSilent(this, mergeZNode);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException(
prefix("Unexpected KeeperException creating base node"), e);
@@ -215,6 +218,8 @@
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
balancerZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.balancer", "balancer"));
+ mergeZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.merge", "merge"));
}
/**
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1424296)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy)
@@ -1043,6 +1043,35 @@
}
/**
+ * Creates the specified node with the specified data . Does not set watch
+ *
+ *
+ * Throws an exception if the node already exists.
+ *
+ *
+ * The node created is persistent and open access.
+ *
+ *
+ * Returns the version number of the created node if successful.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to create
+ * @param data data of node to create
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws KeeperException.NodeExistsException if node already exists
+ */
+ public static void createAndNoWatch(ZooKeeperWatcher zkw, String znode,
+ byte[] data) throws KeeperException, KeeperException.NodeExistsException {
+ try {
+ waitForZKConnectionIfAuthenticating(zkw);
+ zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
+ CreateMode.PERSISTENT);
+ } catch (InterruptedException e) {
+ zkw.interruptedException(e);
+ }
+ }
+
+ /**
* Async creates the specified node with the specified data.
*
*
Throws an exception if the node already exists.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeRequest.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeRequest.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeRequest.java (revision 0)
@@ -0,0 +1,238 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.master.MergeTransaction.JournalState;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region merges. Put in a queue, owned by MergeManager.
+ */
+class MergeRequest implements Runnable {
+ static final Log LOG = LogFactory.getLog(MergeRequest.class);
+
+ private final MergeTransactionData transactionData;
+ private JournalState initialState;
+ private final RegionMergeManager mergeManager;
+ private final MasterServices masterServices;
+
+ private static Class extends MergeTransaction> mergeTransactionClass;
+
+ MergeRequest(final RegionMergeManager mergeManager,
+ final MergeTransactionData transactionData, JournalState initialState,
+ MasterServices masterServices) {
+ Preconditions.checkNotNull(masterServices);
+ this.transactionData = transactionData;
+ this.initialState = initialState;
+ this.mergeManager = mergeManager;
+ this.masterServices = masterServices;
+ }
+
+ @Override
+ public void run() {
+ if (this.masterServices.isStopped()) {
+ LOG.debug("Skipping merge because server is stopped");
+ return;
+ }
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Merging " + transactionData + " with initialState=" + initialState);
+ try {
+ mergeManager.waitUntilMasterInitialized();
+ HRegionInfo regionInfoA = this.mergeManager.getRegionInfoByEncodedName(
+ transactionData.getTableName(), transactionData.getRegionA());
+ HRegionInfo regionInfoB = this.mergeManager.getRegionInfoByEncodedName(
+ transactionData.getTableName(), transactionData.getRegionB());
+ if ((regionInfoA == null || regionInfoB == null)) {
+ if (!masterServices.getAssignmentManager().getZKTable()
+ .isTablePresent(transactionData.getTableName())) {
+ String msg = "Skip merging " + transactionData
+ + ", because table is not present now";
+ LOG.info(msg);
+ status.abort(msg);
+ skipRequest(regionInfoA, regionInfoB);
+ return;
+ } else if (this.initialState == JournalState.CREATE_MERGE) {
+ String msg = "Skip merging " + transactionData
+ + ", because couldn't get the regioninfo, initialState="
+ + initialState;
+ LOG.info(msg);
+ status.abort(msg);
+ skipRequest(regionInfoA, regionInfoB);
+ return;
+ } else if (this.initialState != JournalState.COMPLETE_MERGING) {
+ String msg = "Resubmit merging, because couldn't get the regioninfo, and initialState="
+ + initialState;
+ LOG.warn(msg);
+ status.abort(msg);
+ mergeManager.addToResubmitQueue(transactionData);
+ return;
+ }
+ } else {
+ if (!transactionData.isForce() && !areAdjacent(regionInfoA, regionInfoB)) {
+ String msg = "Skip merging " + transactionData
+ + ", because two region are not adjacent, regionA="
+ + regionInfoA.getRegionNameAsString() + ",regionB="
+ + regionInfoB.getRegionNameAsString();
+ LOG.info(msg);
+ status.abort(msg);
+ skipRequest(regionInfoA, regionInfoB);
+ return;
+ }
+ }
+ status.setStatus("Starting merging");
+ LOG.info("Start merging " + transactionData + ", initialState="
+ + initialState);
+ MergeTransaction mt = null;
+ int triesNumber = masterServices.getConfiguration().getInt(
+ "hbase.merge.number.tries", 5);
+ for (int i = 0; i < triesNumber; i++) {
+ try {
+ mt = createMergeTransaction(masterServices.getConfiguration(),
+ masterServices, mergeManager, transactionData, regionInfoA,
+ regionInfoB, initialState, status);
+ mt.execute(this.masterServices);
+ break;
+ } catch (IOException e) {
+ if (i == triesNumber - 1) {
+ throw e;
+ }
+ LOG.warn("Failed executing merge, " + transactionData + ", try="
+ + (i + 1), e);
+ if (mt != null) {
+ this.initialState = mt.getCurrentState();
+ }
+ }
+ }
+ HRegionInfo mergedRegion=mt.getMergedRegion();
+ String msg = "Complete merging "+ transactionData
+ + ", successful=" + mt.isSuccessful()
+ + (mergedRegion == null ? "" : ", merged region="
+ + mergedRegion.getRegionNameAsString());
+ LOG.info(msg);
+ status.markComplete(msg);
+ } catch (InterruptedException e) {
+ String msg = "Merging" + transactionData + "interrupted by user ";
+ LOG.warn(msg, e);
+ status.abort(msg);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ String msg = "IOException in merging " + transactionData;
+ LOG.warn(msg, e);
+ status.abort(msg);
+ if (masterServices.getMasterFileSystem().checkFileSystem()) {
+ this.mergeManager.addToResubmitQueue(transactionData);
+ }
+ } catch (KeeperException e) {
+ String msg = "KeeperException in merging " + transactionData;
+ LOG.error(msg, e);
+ status.abort(msg);
+ masterServices.abort(msg, e);
+ } catch (Throwable e) {
+ String msg = "Unknown Exception in merging " + transactionData;
+ LOG.error(msg, e);
+ status.abort(msg);
+ this.mergeManager.addToResubmitQueue(transactionData);
+ } finally {
+ status.cleanup();
+ }
+ }
+
+ private void skipRequest(HRegionInfo regionInfoA, HRegionInfo regionInfoB)
+ throws InterruptedException, KeeperException {
+ if(regionInfoA!=null) this.mergeManager.assignMergingRegionIfOffline(regionInfoA);
+ if(regionInfoB!=null) this.mergeManager.assignMergingRegionIfOffline(regionInfoB);
+ this.mergeManager.finishMergeTransaction(transactionData, true);
+ }
+
+ /**
+ * Check whether two regions are adjacent
+ * @param regionA
+ * @param regionB
+ * @return true if two regions are adjacent
+ */
+ public static boolean areAdjacent(HRegionInfo regionA, HRegionInfo regionB) {
+ HRegionInfo a = regionA;
+ HRegionInfo b = regionB;
+ if (Bytes.compareTo(a.getStartKey(), b.getStartKey()) > 0) {
+ a = regionB;
+ b = regionA;
+ }
+ if (Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Utility for constructing an instance of MergeTransaction class.
+ * @param conf
+ * @param masterServices
+ * @param mergeManager
+ * @param transactionData
+ * @param mergeA
+ * @param mergeB
+ * @param initialState
+ * @return instance of MergeTransaction class
+ */
+ static MergeTransaction createMergeTransaction(final Configuration conf,
+ final MasterServices masterServices, final RegionMergeManager mergeManager,
+ final MergeTransactionData transactionData, final HRegionInfo mergeA,
+ final HRegionInfo mergeB, final JournalState initialState,
+ final MonitoredTask status) {
+ if (mergeTransactionClass == null) {
+ mergeTransactionClass = conf.getClass(
+ "hbase.master.merge.transaction.impl", MergeTransaction.class,
+ MergeTransaction.class);
+ }
+ try {
+ Constructor extends MergeTransaction> c = mergeTransactionClass
+ .getConstructor(MasterServices.class, RegionMergeManager.class,
+ MergeTransactionData.class, HRegionInfo.class, HRegionInfo.class,
+ JournalState.class, MonitoredTask.class);
+ return c.newInstance(masterServices, mergeManager, transactionData,
+ mergeA, mergeB, initialState, status);
+ } catch (InvocationTargetException ite) {
+ Throwable target = ite.getTargetException() != null ? ite
+ .getTargetException() : ite;
+ if (target.getCause() != null)
+ target = target.getCause();
+ throw new RuntimeException("Failed construction of MergeTransaction: "
+ + mergeTransactionClass.toString(), target);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed construction of MergeTransaction: "
+ + mergeTransactionClass.toString()
+ + ((e.getCause() != null) ? e.getCause().getMessage() : ""), e);
+ }
+ }
+
+}