### Eclipse Workspace Patch 1.0
#P apache-trunk
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (working copy)
@@ -117,6 +117,10 @@
+ regionInfo.getRegionNameAsString() + " but "
+ "this table is disabled, triggering close of region");
assignmentManager.unassign(regionInfo);
+ } else if (this.assignmentManager.isRegionInMerging(regionInfo)) {
+ LOG.debug("Opened region " + regionInfo.getRegionNameAsString()
+ + " but " + "this region is in merging, triggering close of region");
+ assignmentManager.unassign(regionInfo);
}
}
}
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1439767)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -356,6 +356,12 @@
/** The upper-half split region column qualifier */
public static final byte [] SPLITB_QUALIFIER = toBytes("splitB");
+ /** The lower-half merge region column qualifier */
+ public static final byte[] MERGEA_QUALIFIER = toBytes("mergeA");
+
+ /** The upper-half merge region column qualifier */
+ public static final byte[] MERGEB_QUALIFIER = toBytes("mergeB");
+
/**
* The meta table version column qualifier.
* We keep current version of the meta table in this column in -ROOT-
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java (revision 0)
@@ -0,0 +1,493 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+/**
+ * Executes region merge as a "transaction". Call {@link #prepare} to check
+ * whether the transaction is available, {@link #offlineRegion} to offline
+ * merging regions, {@link #executeMerging} to merge files on fs,
+ * {@link #completeMerging} and {@link #cancelMerging} to remove useless data.
+ *
+ *
+ * This class is not thread safe. Caller needs to ensure merge is run by one
+ * thread only.
+ */
+public class MergeTransaction {
+ private static final Log LOG = LogFactory.getLog(MergeTransaction.class);
+
+ private final HTableDescriptor htd;
+ private final HRegionInfo regionA;
+ private final HRegionInfo regionB;
+ // Region C merged from regionA and regionB
+ private HRegionInfo mergedRegion;
+ private final MergeTransactionData transactionData;
+ private final RegionMergeManager mergeManager;
+
+ // Which state does this merge transaction start from
+ private final JournalState initialState;
+ // Current state of this merge transaction
+ protected JournalState currentState;
+
+ private boolean successful = false;
+
+ private final Path rootDir;
+ private final FileSystem fs;
+
+ private final MonitoredTask status;
+
+ /**
+ * State of the transaction journal. Each enum is a state in the merge
+ * transaction. Used to redo after master restart
+ */
+ enum JournalState {
+ /**
+ * Initial state when requesting a merge
+ */
+ CREATE_MERGE,
+ /**
+ * Enter the offline region state after {@link #prepare()}
+ */
+ OFFLINE_REGION,
+ /**
+ * Enter the state executing merge after {@link #offlineRegion()}
+ */
+ EXECUTE_MERGING,
+ /**
+ * Enter the state completing merging after region merged
+ */
+ COMPLETE_MERGING,
+ /**
+ * Enter the state canceling merging if merge transaction is unavailable
+ */
+ CANCEL_MERGING,
+ /**
+ * State representing transaction being over
+ */
+ END
+ }
+
+ public MergeTransaction(final MasterServices masterServices,
+ final RegionMergeManager mergeManager,
+ final MergeTransactionData transactionData, final HRegionInfo mergeA,
+ final HRegionInfo mergeB, final JournalState initialState,
+ final MonitoredTask status) throws FileNotFoundException, IOException {
+ this.rootDir = masterServices.getMasterFileSystem().getRootDir();
+ this.fs = masterServices.getMasterFileSystem().getFileSystem();
+ this.mergeManager = mergeManager;
+ this.transactionData = transactionData;
+ this.htd = masterServices.getTableDescriptors().get(
+ transactionData.getTableName());
+ // Make sure regionA < regionB
+ if (mergeA == null || mergeA.compareTo(mergeB) < 0) {
+ this.regionA = mergeA;
+ this.regionB = mergeB;
+ } else {
+ this.regionA = mergeB;
+ this.regionB = mergeA;
+ }
+ this.initialState = initialState;
+ this.currentState = initialState;
+ this.status = status;
+ }
+
+ /**
+ * Does checks for merge inputs.
+ *
+ * Set state to OFFLINE_REGION if the merge is available else set the state to
+ * CANCEL_MERGING if it is not(e.g. merge region is a parent or has
+ * reference).
+ * @throws IOException
+ * @throws KeeperException
+ * @throws NoNodeException
+ */
+ void prepare() throws IOException, NoNodeException, KeeperException {
+ status.setStatus("Preparing...");
+ boolean available = true;
+ if (regionA.isSplit() || regionA.isOffline()) {
+ available = false;
+ LOG.debug("Region " + regionA.getRegionNameAsString()
+ + " is split or offline");
+ } else if (regionB.isSplit() || regionB.isOffline()) {
+ available = false;
+ LOG.debug("Region " + regionB.getRegionNameAsString()
+ + " is split or offline");
+ } else if (HRegion.containsReferenceFile(fs, rootDir, regionA, htd)) {
+ available = false;
+ LOG.debug("Region " + regionA.getRegionNameAsString()
+ + " has reference , cancel the merge");
+ } else if (HRegion.containsReferenceFile(fs, rootDir, regionB, htd)) {
+ available = false;
+ LOG.debug("Region " + regionB.getRegionNameAsString()
+ + " has reference , cancel the merge");
+ } else if (!ensureNoParentRegion()) {
+ available = false;
+ LOG.debug("Exist parent region and can't be cleared, cancel the merge");
+ }
+ if (available) {
+ this.currentState = JournalState.OFFLINE_REGION;
+ } else {
+ this.currentState = JournalState.CANCEL_MERGING;
+ }
+ setJournalStateOnZK();
+ }
+
+ /**
+ * Run the transaction according to the current state
+ * @param masterServices
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void execute(MasterServices masterServices) throws IOException,
+ KeeperException, InterruptedException {
+ while (!masterServices.isStopped()) {
+ if (this.currentState.equals(JournalState.END)) {
+ break;
+ }
+ switch (this.currentState) {
+ case CREATE_MERGE:
+ prepare();
+ break;
+ case OFFLINE_REGION:
+ offlineRegion(masterServices);
+ break;
+ case EXECUTE_MERGING:
+ executeMerging(masterServices);
+ break;
+ case COMPLETE_MERGING:
+ completeMerging(masterServices);
+ break;
+ case CANCEL_MERGING:
+ cancelMerging(masterServices);
+ break;
+ default:
+ throw new RuntimeException("Unhandled merge state: " + currentState);
+ }
+ }
+ }
+
+ /**
+ * Offline the two merging regions
+ * @param masterServices
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ void offlineRegion(MasterServices masterServices) throws KeeperException,
+ InterruptedException, IOException {
+ status.setStatus("Offline region...");
+ masterServices.getAssignmentManager().unassign(this.regionA, true);
+ masterServices.getAssignmentManager().unassign(this.regionB, true);
+ mergeManager.waitUntilRegionsOffline(this.regionA, this.regionB);
+ mergeManager.waitUntilNoDeadServerInProcess(masterServices);
+ boolean conditionA = false;
+ if (mergeManager.isRegionSplit(this.regionA)
+ || mergeManager.isRegionSplit(this.regionB)) {
+ LOG.info("Canceling merge because region "
+ + (mergeManager.isRegionSplit(this.regionA) ? regionA
+ .getEncodedName() : regionB.getEncodedName()) + " has been split");
+ this.currentState = JournalState.CANCEL_MERGING;
+ } else if ((conditionA = containsEditsLogFile(this.regionA))
+ || containsEditsLogFile(this.regionB)) {
+ LOG.info("Canceling merge because region "
+ + (conditionA ? regionA.getEncodedName() : regionB.getEncodedName())
+ + " has edits log file");
+ this.currentState = JournalState.CANCEL_MERGING;
+ }else {
+ this.currentState = JournalState.EXECUTE_MERGING;
+ }
+ setJournalStateOnZK();
+ }
+
+ /**
+ * Merging the files in HDFS, add merged region to META and assign it
+ * @param masterServices
+ * @throws NoNodeException
+ * @throws KeeperException
+ * @throws IOException
+ */
+ void executeMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, IOException {
+ status.setStatus("Merging regions in fs...");
+ this.mergedRegion = HRegion.generateFixedRegionByMerge(htd, regionA,
+ regionB);
+ Pair mergedRegionInMeta = null;
+ boolean isMergedRegionSplit = false;
+ if (this.initialState == JournalState.EXECUTE_MERGING) {
+ // It is a redo from this step
+ mergedRegionInMeta = MetaReader.getRegion(
+ masterServices.getCatalogTracker(), mergedRegion.getRegionName());
+ if (mergedRegionInMeta == null) {
+ isMergedRegionSplit = checkRegionSplit(
+ masterServices.getCatalogTracker(), mergedRegion, regionA);
+ LOG.debug("Redo merging " + transactionData + "from " + initialState
+ + "and check whether merged region"
+ + mergedRegion.getRegionNameAsString() + " is split ="
+ + isMergedRegionSplit);
+ }
+ }
+ if (!isMergedRegionSplit && mergedRegionInMeta == null) {
+ this.mergedRegion = HRegion.merge(masterServices.getConfiguration(), fs,
+ rootDir, htd, regionA, regionB);
+ MetaEditor.addMergedRegionToMeta(masterServices.getCatalogTracker(),
+ this.mergedRegion, regionA, regionB);
+ }
+ status.setStatus("Assigning merged region "
+ + mergedRegion.getRegionNameAsString());
+ if (!isMergedRegionSplit
+ && (mergedRegionInMeta == null || mergedRegionInMeta.getSecond() == null)) {
+ if (mergeManager.isRegionOffline(mergedRegion)) {
+ masterServices.getAssignmentManager().assign(this.mergedRegion, true);
+ }
+ }
+ this.currentState = JournalState.COMPLETE_MERGING;
+ setJournalStateOnZK();
+ }
+
+ /**
+ * Delete merging regions from META after completing the merge successfully
+ * @param masterServices
+ * @throws KeeperException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void completeMerging(MasterServices masterServices) throws KeeperException,
+ IOException, InterruptedException {
+ status.setStatus("Completing merging... ");
+ // Remove regions from .META.
+ if (this.regionA != null) {
+ MetaEditor.deleteRegion(masterServices.getCatalogTracker(), this.regionA);
+ }
+ if (this.regionB != null) {
+ MetaEditor.deleteRegion(masterServices.getCatalogTracker(), this.regionB);
+ }
+ this.mergeManager.waitUntilNoDeadServerInProcess(masterServices);
+ this.mergeManager.finishMergeTransaction(transactionData, true);
+ this.currentState = JournalState.END;
+ this.successful = true;
+ }
+
+ /**
+ * Merge is unavailable, cancel it; Assign the merging region if it is offline
+ * because of merging
+ * @param masterServices
+ * @throws NoNodeException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ void cancelMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, InterruptedException {
+ status.setStatus("Canceling merging... ");
+ this.mergeManager.assignMergingRegionIfOffline(this.regionA);
+ this.mergeManager.assignMergingRegionIfOffline(this.regionB);
+ this.mergeManager.finishMergeTransaction(transactionData, true);
+ this.currentState = JournalState.END;
+ }
+
+ protected void setJournalStateOnZK() throws NoNodeException,
+ KeeperException {
+ LOG.debug("Setting state " + currentState + " on zk for merging request "
+ + transactionData);
+ mergeManager.setJournalStateOnZK(transactionData.getTransactionName(),
+ this.currentState);
+ }
+
+ /**
+ * Check whether region has edits log file
+ * @param region
+ * @return true if region contains edits log file
+ * @throws IOException
+ */
+ private boolean containsEditsLogFile(final HRegionInfo region)
+ throws IOException {
+ Path tableDir = new Path(rootDir, region.getTableNameAsString());
+ Path regiondir = HRegion.getRegionDir(tableDir, region.getEncodedName());
+ Set files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
+ return files != null && !files.isEmpty();
+ }
+
+ /**
+ * Between offline region and start merging, make sure there is no parent
+ * region of the two regions in .META. If exists, fixing up daughters would
+ * cause daughter regions(we have merged one) online again when we restart
+ * master, so we clear the parent region in this function to prevent the above
+ * case
+ * @return true if there is no parent region of merging regions
+ * @throws IOException
+ */
+ private boolean ensureNoParentRegion() throws IOException {
+ Map splitParents = mergeManager
+ .getSplitParents(regionA.getTableName());
+ for (Map.Entry splitParent : splitParents.entrySet()) {
+ // check whether its range contains merging region
+ if (doRegionsOverlap(regionA, splitParent.getKey())
+ || doRegionsOverlap(regionB, splitParent.getKey())) {
+ LOG.warn("Region " + regionA.getEncodedName() + " or region "
+ + regionB.getEncodedName() + " has parent region "
+ + splitParent.getKey().getRegionNameAsString()
+ + ", try to clean it");
+ if (!this.mergeManager.cleanParent(splitParent.getKey(),
+ splitParent.getValue())) {
+ LOG.warn("Failed cleaning parent region "
+ + splitParent.getKey().getRegionNameAsString()
+ + ", canceling merge");
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Check whether the region is split from the META
+ * @param catalogTracker
+ * @param checkRegion
+ * @param excludeRegion
+ * @param result
+ * @return true if region is split
+ * @throws IOException
+ */
+ static boolean checkRegionSplit(final CatalogTracker catalogTracker,
+ final HRegionInfo checkRegion, final HRegionInfo excludeRegion)
+ throws IOException {
+ final AtomicBoolean isSplit = new AtomicBoolean(false);
+ byte[] startRow = checkRegion.getRegionName();
+ final byte[] startKey = checkRegion.getStartKey();
+ final byte[] endKey = checkRegion.getEndKey();
+ final byte[] tableName = checkRegion.getTableName();
+ MetaReader.fullScan(catalogTracker, new MetaReader.Visitor() {
+ @Override
+ public boolean visit(Result r) throws IOException {
+ if (r == null || r.isEmpty()) return true;
+ HRegionInfo info = HRegionInfo.getHRegionInfo(r);
+ if (info == null) return true; // Keep scanning
+ // other table,stop scanning
+ if (Bytes.compareTo(info.getTableName(), tableName) > 0) return false;
+ if (Bytes.compareTo(info.getTableName(), tableName) == 0) {
+ if (Bytes.compareTo(info.getStartKey(), startKey) == 0) {
+ if (Bytes.compareTo(info.getEndKey(), endKey) == 0) {
+ // find itself,keep scanning
+ return true;
+ } else if (info.equals(excludeRegion)) {
+ return true;
+ }
+ isSplit.set(true);
+ return false;
+ } else if (Bytes.compareTo(info.getStartKey(), startKey) > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }, startRow);
+ return isSplit.get();
+ }
+
+ public boolean isSuccessful() {
+ return this.successful;
+ }
+
+ public HRegionInfo getMergedRegion() {
+ return this.mergedRegion;
+ }
+
+ public JournalState getCurrentState() {
+ return this.currentState;
+ }
+
+ /**
+ * Check whether there is overlap between region A and region B
+ * @param regionA
+ * @param regionB
+ * @return true if regions' key ranges overlap
+ */
+ private static boolean doRegionsOverlap(HRegionInfo regionA,
+ HRegionInfo regionB) {
+ // if A's startKey < B's startkey, swap them
+ if (Bytes.compareTo(regionA.getStartKey(), regionB.getStartKey()) < 0) {
+ return doRegionsOverlap(regionB, regionA);
+ } else {
+ if (Bytes.compareTo(regionA.getStartKey(), regionB.getEndKey()) < 0
+ || Bytes.equals(regionB.getEndKey(), HConstants.EMPTY_END_ROW)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Get the bytes for a specified JournalState
+ * @param state
+ * @return the bytes of given journal state
+ */
+ static byte[] getBytesFromJournalState(JournalState state) {
+ return Bytes.toBytes(state.toString());
+
+ }
+
+ /**
+ * Get an JournalState from bytes. Throws a {@link RuntimeException} if the
+ * data represents unknown bytes
+ * @param data
+ * @return the journal state as the given bytes
+ */
+ static JournalState parseJournalStateFromBytes(byte[] data) {
+ String stateString = Bytes.toString(data);
+ for (JournalState state : JournalState.values()) {
+ if (state.toString().equals(stateString))
+ return state;
+ }
+ throw new RuntimeException("Unknown JournalState " + stateString);
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy)
@@ -102,6 +102,8 @@
protected final Server server;
private ServerManager serverManager;
+
+ private RegionMergeManager mergeManager;
private CatalogTracker catalogTracker;
@@ -1026,6 +1028,11 @@
LOG.info("Opened region " + regionNameStr
+ "but this table is disabled, triggering close of region");
unassign(regionInfo);
+ } else if (isRegionInMerging(regionInfo)) {
+ LOG.debug("Opened region "
+ + regionInfo.getRegionNameAsString() + " but "
+ + "this region is in merging, triggering close of region");
+ unassign(regionInfo);
}
}
}
@@ -2180,6 +2187,8 @@
Map allRegions = MetaReader.fullScan(
catalogTracker, disabledOrDisablingOrEnabling, true);
if (allRegions == null || allRegions.isEmpty()) return;
+ // Remove the merging regions
+ removeMergingRegions(allRegions);
// Determine what type of assignment to do on startup
boolean retainAssignment = server.getConfiguration().
@@ -2732,6 +2741,9 @@
public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
final HRegionInfo a, final HRegionInfo b) {
regionOffline(parent);
+ if (this.mergeManager != null) {
+ this.mergeManager.notifyRegionSplit(parent);
+ }
regionOnline(a, sn);
regionOnline(b, sn);
@@ -2813,4 +2825,90 @@
}
return true;
}
+
+ void setMergeManager(RegionMergeManager mergeManager) {
+ assert this.mergeManager == null;
+ this.mergeManager = mergeManager;
+ }
+
+ public RegionMergeManager getMergeManager() {
+ return this.mergeManager;
+ }
+
+ public boolean isRegionInMerging(HRegionInfo regionInfo) {
+ return this.mergeManager != null
+ && this.mergeManager.isRegionInMerging(regionInfo);
+ }
+
+ /**
+ * Offline the specified region which is merging now
+ * @param regionInfo
+ */
+ public void offlineMergingRegion(HRegionInfo regionInfo) {
+ // Region is merging so should not be reassigned, just delete the CLOSED
+ // node
+ LOG.debug("Region is in merging so deleting ZK node and removing from "
+ + "regions in transition, skipping assignment of region "
+ + regionInfo.getRegionNameAsString());
+ try {
+ if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
+ // Could also be in OFFLINE mode
+ ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
+ }
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.debug("Tried to delete closed node for " + regionInfo + " but it "
+ + "does not exist so just offlining");
+ } catch (KeeperException e) {
+ this.server.abort("Error deleting CLOSED node in ZK for region "
+ + regionInfo.getEncodedName(), e);
+ }
+ regionOffline(regionInfo);
+ if (this.mergeManager != null) {
+ this.mergeManager.notifyRegionOffline(regionInfo);
+ }
+ }
+
+ /**
+ * Remove the merging regions from the specified map of regions
+ * @param regions
+ */
+ private void removeMergingRegions(Map regions) {
+ if (this.mergeManager != null && this.mergeManager.isAnyRegionInMerging()) {
+ Iterator iterator = regions.keySet().iterator();
+ while (iterator.hasNext()) {
+ if (this.mergeManager.isRegionInMerging(iterator.next())) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the merging region for the specified list of regions
+ * @param regions
+ * @return list of region which are merging
+ */
+ public List getMergingRegions(List regions) {
+ List regionsInMerging = new ArrayList();
+ if (this.mergeManager != null && this.mergeManager.isAnyRegionInMerging()) {
+ for (HRegionInfo region : regions) {
+ if (this.mergeManager.isRegionInMerging(region)) {
+ regionsInMerging.add(region);
+ }
+ }
+ }
+ return regionsInMerging;
+ }
+
+ /**
+ * Get the merging region num for the specified table
+ * @param tableName
+ * @return number of merging regions of given table
+ */
+ public int getNumOfMergingRegionsForTable(String tableName) {
+ if (this.mergeManager != null) {
+ return mergeManager.getNumOfMergingRegionsForTable(tableName);
+ }
+ return 0;
+ }
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (working copy)
@@ -319,6 +319,11 @@
+ " is disabled. Hence not assigning region" + hri.getEncodedName());
return false;
}
+ if (assignmentManager.isRegionInMerging(hri)) {
+ LOG.info("Region " + hri.getRegionNameAsString()
+ + " is merging. Hence not assigning it");
+ return false;
+ }
return true;
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java (revision 0)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java (revision 0)
@@ -0,0 +1,610 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.MergeTransaction.JournalState;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Test the {@link MergeTransaction} class against two merging regions.
+ */
+@Category(LargeTests.class)
+public class TestMergeTransaction {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final Log LOG = LogFactory.getLog(TestMergeTransaction.class);
+ private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
+ private static final byte[] QUALIFIER = Bytes.toBytes("q");
+ private static final int slaves = 2;
+
+ private static byte[] ROW = Bytes.toBytes("testRow");
+ private static final int INITIAL_REGION_NUM = 10;
+ private static final int ROWSIZE = 200;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static int waitTime = 30 * 1000;
+
+ private enum ExceptionLocation {
+ NONE,
+ BEFORE_OFFLINE,
+ BEFORE_EXECUTE,
+ BEFORE_COMPLETE,
+ BEFORE_CANCEL,
+ SET_ZK
+ }
+
+ // Throw exception in certain step, simulating redo
+ private static ExceptionLocation exceptionThrownLocation = ExceptionLocation.NONE;
+
+ private static TestingMergeTransaction currentMergeTransaction;
+
+ private static boolean forceEnterCancelState = false;
+
+ private static AtomicInteger successfullyCompletedCount = new AtomicInteger(0);
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ TEST_UTIL.getConfiguration().setClass(
+ "hbase.master.merge.transaction.impl", TestingMergeTransaction.class,
+ MergeTransaction.class);
+ TEST_UTIL.getConfiguration().setBoolean("hbase.hregion.memstore.mslab.enabled", false);
+ TEST_UTIL.getConfiguration().setInt("hbase.merge.number.tries", 1);
+ // Start a cluster of two regionservers.
+ TEST_UTIL.startMiniCluster(slaves);
+ }
+
+ @AfterClass
+ public static void afterAllTests() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void after() {
+ currentMergeTransaction = null;
+ forceEnterCancelState = false;
+ }
+
+ @Test
+ public void testWholesomeMerge() throws Exception {
+ LOG.info("Starting testWholesomeMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testWholesomeMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Merge 1st and 2nd region
+ mergeAndVerify(master, tablename, 0, 1, INITIAL_REGION_NUM - 1);
+
+ // Merge 2nd and 3th region
+ mergeAndVerify(master, tablename, 1, 2, INITIAL_REGION_NUM - 2);
+
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenOfflineRegionEncountersException()
+ throws Exception {
+ LOG.info("Starting testRedoMergeWhenOfflineRegion");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenOfflineRegion");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when offline region
+ exceptionThrownLocation = ExceptionLocation.BEFORE_OFFLINE;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ exceptionThrownLocation = ExceptionLocation.SET_ZK;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ exceptionThrownLocation = ExceptionLocation.NONE;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenExecuteMergeEncountersException()
+ throws Exception {
+ LOG.info("Starting testRedoMergeWhenExecuteMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenExecuteMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when execute merge
+ exceptionThrownLocation = ExceptionLocation.BEFORE_EXECUTE;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ exceptionThrownLocation = ExceptionLocation.SET_ZK;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyResubmitQueueSize(master, 1);
+
+ exceptionThrownLocation = ExceptionLocation.NONE;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenCompleteMergeEncountersException()
+ throws Exception {
+ LOG.info("Starting testRedoMergeWhenCompleteMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenCompleteMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when complete merge
+ exceptionThrownLocation = ExceptionLocation.BEFORE_COMPLETE;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+ exceptionThrownLocation = ExceptionLocation.NONE;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1);
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testRedoMergeWhenCancelMergeEncountersException()
+ throws Exception {
+ LOG.info("Starting testRedoMergeWhenCancelMerge");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenCancelMerge");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ // Throw exception when cancel merge
+ exceptionThrownLocation = ExceptionLocation.BEFORE_CANCEL;
+ requestMergeRegion(master, tablename, 0, 1);
+ waitAndVerifyResubmitQueueSize(master, 1);
+ exceptionThrownLocation = ExceptionLocation.NONE;
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM);
+ verifyRowCount(table, ROWSIZE);
+
+ table.close();
+ }
+
+ @Test
+ public void testConcurrentRegionMergingAndRegionSplitting()
+ throws Exception {
+ LOG.info("Starting testConcurrentRegionMergingAndRegionSplitting");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ final HMaster master = cluster.getMaster();
+ final byte[] tablename = Bytes
+ .toBytes("testConcurrentRegionMergingAndRegionSplitting");
+ final int numRegions = 100;
+ HTable table = createTableAndLoadData(master, tablename, numRegions);
+ final List> tableRegions = MetaReader
+ .getTableRegionsAndLocations(master.getCatalogTracker(),
+ Bytes.toString(tablename));
+ assert tableRegions.size()==numRegions;
+
+ final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ SecureRandom random = new SecureRandom();
+ int successfulCountBeforeTest = successfullyCompletedCount.get();
+
+ final List splitPoints = new ArrayList();
+ for (int i = 0; i < numRegions; i++) {
+ HRegionInfo regionInfo = tableRegions.get(i).getFirst();
+ byte[] splitPoint=ROWS[random.nextInt(ROWSIZE / numRegions)
+ + i * ROWSIZE / numRegions];
+ assertTrue(regionInfo.getRegionNameAsString() + " doesn't contain row "
+ + Bytes.toString(splitPoint), regionInfo.containsRow(splitPoint));
+ splitPoints.add(splitPoint);
+ }
+
+ Thread mergeRequestThread=new Thread(){
+ public void run() {
+ for (int i = 0; i < numRegions; i += 2) {
+ try {
+ RegionMergeManager.createMergeRequest(master.getZooKeeper(),
+ Bytes.toString(tablename), tableRegions.get(i).getFirst()
+ .getEncodedName(), tableRegions.get(i + 1).getFirst()
+ .getEncodedName(), false);
+ } catch (Exception e) {
+ fail("Failed creating merge request for region "
+ + tableRegions.get(i).getFirst().getEncodedName() + "and "
+ + tableRegions.get(i + 1).getFirst().getEncodedName());
+ }
+ }
+ }
+ };
+
+ Thread splitRequestThread = new Thread() {
+ public void run() {
+ for (int i = 0; i < numRegions; i++) {
+ try{
+ admin.split(tableRegions.get(i).getFirst().getRegionName(),
+ splitPoints.get(i));
+ } catch (Exception e) {
+ LOG.info("Region may be in merging, failed calling split, "
+ + e.getMessage());
+ }
+
+ }
+ }
+ };
+ mergeRequestThread.start();
+ splitRequestThread.start();
+ mergeRequestThread.join();
+ splitRequestThread.join();
+ waitUntilNoRegionInMerging(master.assignmentManager.getMergeManager());
+
+ int successfulCountThisTest = successfullyCompletedCount.get() - successfulCountBeforeTest;
+ LOG.info("Successfully merge " + successfulCountThisTest * 2 + " regions in testing");
+ verifyRowCount(table, ROWSIZE);
+ table.close();
+ }
+
+ /**
+ * Restart master and regionserver when do merging
+ * @throws Exception
+ */
+ @Test
+ public void testRedoMergeWhenServerRestart() throws Exception {
+ LOG.info("Starting testRedoMergeWhenServerRestart");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ byte[] tablename = Bytes.toBytes("testRedoMergeWhenServerRestart");
+ HTable table = createTableAndLoadData(master, tablename);
+
+ LOG.info("Test restarting servers after entering state "
+ + JournalState.CREATE_MERGE);
+ restartServersAfterExpectedState(table, tablename,
+ JournalState.CREATE_MERGE);
+
+ LOG.info("Test restarting servers after entering state "
+ + JournalState.OFFLINE_REGION);
+ restartServersAfterExpectedState(table, tablename,
+ JournalState.OFFLINE_REGION);
+
+ LOG.info("Test restarting servers after entering state "
+ + JournalState.EXECUTE_MERGING);
+ restartServersAfterExpectedState(table, tablename,
+ JournalState.EXECUTE_MERGING);
+
+ LOG.info("Test restarting servers after entering state "
+ + JournalState.COMPLETE_MERGING);
+ restartServersAfterExpectedState(table, tablename,
+ JournalState.COMPLETE_MERGING);
+
+ LOG.info("Test restarting servers after entering state "
+ + JournalState.CANCEL_MERGING);
+ forceEnterCancelState = true;
+ restartServersAfterExpectedState(table, tablename,
+ JournalState.CANCEL_MERGING);
+ forceEnterCancelState = false;
+
+ table.close();
+ }
+
+ private void restartServersAfterExpectedState(final HTable table,
+ final byte[] tablename, JournalState expectedState) throws Exception {
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ int regionNumBeforeMerging = getTableRegionsNum(master, tablename);
+ currentMergeTransaction = null;
+ requestMergeRegion(master, tablename, 0, 1);
+ LOG.info("Restarting servers after entering state " + expectedState);
+ waitUntilEnterExpectedState(expectedState);
+ restartMasterAndServer();
+ // Get master again after restart;
+ master = cluster.getMaster();
+ master.assignmentManager.getMergeManager().triggerResubmissionForTest();
+ waitUntilEnterExpectedState(JournalState.END);
+
+ if (currentMergeTransaction.isSuccessful()) {
+ waitAndVerifyRegionNum(master, tablename, regionNumBeforeMerging - 1);
+ }
+ verifyRowCount(table, ROWSIZE);
+ }
+
+ private void restartMasterAndServer() throws Exception {
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ List liveRSs = cluster
+ .getLiveRegionServerThreads();
+ assert liveRSs.size() == slaves;
+ LOG.info("Aborting regionserver...");
+ SecureRandom random = new SecureRandom();
+ HRegionServer rs = liveRSs.get(random.nextInt(slaves)).getRegionServer();
+ rs.abort("Aborting for tests", new Exception("Trace info"));
+ cluster.waitForRegionServerToStop(rs.getServerName(), waitTime);
+
+ List liveMasters = cluster
+ .getLiveMasterThreads();
+ assert liveMasters.size() == 1;
+ LOG.info("Aborting master...");
+ master.abort("Aborting for tests", new Exception("Trace info"));
+ cluster.waitForMasterToStop(master.getServerName(), waitTime);
+
+ master = cluster.startMaster().getMaster();
+ TEST_UTIL.ensureSomeRegionServersAvailable(slaves);
+ long timeout = System.currentTimeMillis() + waitTime;
+ while (!master.isInitialized() && System.currentTimeMillis() < timeout) {
+ Thread.sleep(50);
+ }
+ }
+
+ private void waitUntilNoRegionInMerging(final RegionMergeManager regionMergeManager)
+ throws Exception {
+ long timeout = System.currentTimeMillis() + waitTime;
+ while (System.currentTimeMillis() < timeout) {
+ if (regionMergeManager.getMergingRegions().isEmpty()) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+ assertTrue("Timeout waiting merging, regions in merging:"
+ + regionMergeManager.getMergingRegions(),
+ regionMergeManager.getMergingRegions().isEmpty());
+ }
+
+ private void waitUntilEnterExpectedState(JournalState expectedState)
+ throws Exception {
+ long timeout = System.currentTimeMillis() + waitTime;
+ while (currentMergeTransaction == null
+ || currentMergeTransaction.currentState.ordinal() < expectedState
+ .ordinal()) {
+ Thread.sleep(5);
+ if (System.currentTimeMillis() > timeout) {
+ fail("Timeout waiting merge to enter state "
+ + expectedState + ", currentState="
+ + (currentMergeTransaction == null ? "NULL"
+ : currentMergeTransaction.currentState));
+ break;
+ }
+ }
+ }
+
+ /**
+ * An MergeTransaction instance used in this test.
+ */
+ public static class TestingMergeTransaction extends MergeTransaction {
+
+ public TestingMergeTransaction(MasterServices masterServices,
+ RegionMergeManager mergeManager, MergeTransactionData transactionData,
+ HRegionInfo mergeA, HRegionInfo mergeB, JournalState initialState,
+ MonitoredTask status) throws FileNotFoundException, IOException {
+ super(masterServices, mergeManager, transactionData, mergeA, mergeB,
+ initialState, status);
+ currentMergeTransaction = this;
+ }
+
+ @Override
+ void offlineRegion(MasterServices masterServices) throws KeeperException,
+ InterruptedException, IOException {
+ throwExceptionIfTest(ExceptionLocation.BEFORE_OFFLINE);
+ super.offlineRegion(masterServices);
+ }
+
+ @Override
+ void executeMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, IOException {
+ throwExceptionIfTest(ExceptionLocation.BEFORE_EXECUTE);
+ super.executeMerging(masterServices);
+ }
+
+ @Override
+ void completeMerging(MasterServices masterServices) throws KeeperException,
+ IOException, InterruptedException {
+ throwExceptionIfTest(ExceptionLocation.BEFORE_COMPLETE);
+ super.completeMerging(masterServices);
+ successfullyCompletedCount.incrementAndGet();
+ }
+
+ @Override
+ void cancelMerging(MasterServices masterServices) throws NoNodeException,
+ KeeperException, InterruptedException {
+ throwExceptionIfTest(ExceptionLocation.BEFORE_CANCEL);
+ super.cancelMerging(masterServices);
+ }
+
+ @Override
+ void prepare() throws IOException, NoNodeException, KeeperException {
+ super.prepare();
+ if (forceEnterCancelState
+ || exceptionThrownLocation == ExceptionLocation.BEFORE_CANCEL) {
+ super.currentState = JournalState.CANCEL_MERGING;
+ super.setJournalStateOnZK();
+ }
+ }
+
+ @Override
+ protected void setJournalStateOnZK() throws NoNodeException,
+ KeeperException {
+ throwExceptionIfTest(ExceptionLocation.SET_ZK);
+ super.setJournalStateOnZK();
+ }
+
+ void throwExceptionIfTest(ExceptionLocation expectedLocation) {
+ if (exceptionThrownLocation == expectedLocation) {
+ throw new RuntimeException("Test");
+ }
+ }
+
+ }
+
+ private HTable createTableAndLoadData(HMaster master, byte[] tablename)
+ throws Exception {
+ return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM);
+ }
+
+ private HTable createTableAndLoadData(HMaster master, byte[] tablename,
+ int numRegions) throws Exception {
+ assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
+ byte[][] splitRows = new byte[numRegions - 1][];
+ for (int i = 0; i < splitRows.length; i++) {
+ splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
+ }
+
+ HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
+ loadData(table);
+ verifyRowCount(table, ROWSIZE);
+
+ // sleep here is an ugly hack to allow region transitions to finish
+ long timeout = System.currentTimeMillis() + waitTime;
+ List> tableRegions;
+ while (System.currentTimeMillis() < timeout) {
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ if (tableRegions.size() == numRegions)
+ break;
+ Thread.sleep(250);
+ }
+
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
+ assertEquals(numRegions, tableRegions.size());
+ return table;
+ }
+
+ private void loadData(HTable table) throws IOException {
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.add(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ }
+
+ private void verifyRowCount(HTable table, int expectedRegionNum)
+ throws IOException {
+ ResultScanner scanner = table.getScanner(new Scan());
+ int rowCount = 0;
+ while (scanner.next() != null) {
+ rowCount++;
+ }
+ assertEquals(expectedRegionNum, rowCount);
+ scanner.close();
+ }
+
+
+ private void mergeAndVerify(HMaster master, byte[] tablename, int regionAnum,
+ int regionBnum, int expectedRegionNum) throws Exception {
+ requestMergeRegion(master, tablename, regionAnum, regionBnum);
+ waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
+
+ }
+
+ private void requestMergeRegion(HMaster master, byte[] tablename,
+ int regionAnum, int regionBnum) throws Exception {
+ List> tableRegions = MetaReader
+ .getTableRegionsAndLocations(master.getCatalogTracker(),
+ Bytes.toString(tablename));
+ TEST_UTIL.getHBaseAdmin().mergeRegion(tablename,
+ tableRegions.get(regionAnum).getFirst().getEncodedNameAsBytes(),
+ tableRegions.get(regionBnum).getFirst().getEncodedNameAsBytes(), false);
+ }
+
+ private void waitAndVerifyRegionNum(HMaster master, byte[] tablename,
+ int expectedRegionNum) throws Exception {
+ List> tableRegions;
+ long timeout = System.currentTimeMillis() + waitTime;
+ while (System.currentTimeMillis() < timeout) {
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ if (tableRegions.size() == expectedRegionNum)break;
+ Thread.sleep(250);
+ }
+
+ tableRegions = MetaReader.getTableRegionsAndLocations(
+ master.getCatalogTracker(), Bytes.toString(tablename));
+ LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegions));
+ assertEquals(expectedRegionNum, tableRegions.size());
+ }
+
+ private int getTableRegionsNum(HMaster master, byte[] tablename)
+ throws Exception {
+ return MetaReader.getTableRegionsAndLocations(master.getCatalogTracker(),
+ Bytes.toString(tablename)).size();
+ }
+
+ private void waitAndVerifyResubmitQueueSize(HMaster master, int expectedSize)
+ throws Exception {
+ int queueSize;
+ long timeout = System.currentTimeMillis() + waitTime;
+ while (System.currentTimeMillis() < timeout) {
+ queueSize = master.getAssignmentManager().getMergeManager().resubmitQueusSize();
+ if (queueSize == expectedSize) break;
+ Thread.sleep(250);
+ }
+
+ queueSize = master.getAssignmentManager().getMergeManager().resubmitQueusSize();
+ assertEquals(expectedSize, queueSize);
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
+ }
+ return ret;
+ }
+
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
@@ -131,6 +132,8 @@
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MergeRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MergeRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
@@ -269,6 +272,9 @@
// Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker;
+ // manager of region merge transactions
+ private RegionMergeManager regionMergeManager;
+
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
// operations/debugging.
@@ -504,6 +510,7 @@
if (this.catalogTracker != null) this.catalogTracker.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
+ if (this.regionMergeManager != null) this.regionMergeManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
this.zooKeeper.close();
}
@@ -551,6 +558,9 @@
this.catalogTracker, this.balancer, this.executorService, this.metricsMaster);
zooKeeper.registerListenerFirst(assignmentManager);
+ this.regionMergeManager = new RegionMergeManager(this, this.zooKeeper);
+ this.regionMergeManager.start();
+
this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
this.serverManager);
this.regionServerTracker.start();
@@ -1472,6 +1482,30 @@
}
@Override
+ public MergeRegionResponse mergeRegion(RpcController controller,
+ MergeRegionRequest req) throws ServiceException {
+ MergeRegionResponse mrr = MergeRegionResponse.newBuilder().build();
+ try{
+ LOG.info("Received request to merge region: "
+ + req.getRegionAName().toString(HConstants.UTF8_ENCODING) + " and "
+ + req.getRegionBName().toString(HConstants.UTF8_ENCODING)
+ + " in table "
+ + req.getTableName().toString(HConstants.UTF8_ENCODING) + " on "
+ + this.serverName);
+ RegionMergeManager.createMergeRequest(this.zooKeeper, req.getTableName()
+ .toString(HConstants.UTF8_ENCODING),
+ req.getRegionAName().toString(HConstants.UTF8_ENCODING), req
+ .getRegionBName().toString(HConstants.UTF8_ENCODING), req
+ .getForce());
+ } catch (KeeperException ke) {
+ throw new ServiceException(ke);
+ } catch (UnsupportedEncodingException e) {
+ throw new ServiceException(e);
+ }
+ return mrr;
+ }
+
+ @Override
public void createTable(HTableDescriptor hTableDescriptor,
byte [][] splitKeys)
throws IOException {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (working copy)
@@ -361,6 +361,7 @@
private static final int DEFAULT_BLOCKING_TIMEOUT = 10000;
private Configuration conf;
private TreeSet daughterRegions = new TreeSet(Bytes.BYTES_COMPARATOR);
+ private TreeSet mergingRegions = new TreeSet(Bytes.BYTES_COMPARATOR);
private int blockingTimeout;
private HTable metaTable;
@@ -394,6 +395,25 @@
if (info == null) {
return true;
}
+ byte[] mergeA = rowResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.MERGEA_QUALIFIER);
+ if (mergeA != null) {
+ // It is a merged region, block here until merging regions not in META
+ HRegionInfo regionA = HRegionInfo.parseFromOrNull(mergeA);
+ HRegionInfo regionB = HRegionInfo.getHRegionInfo(rowResult,
+ HConstants.MERGEB_QUALIFIER);
+ mergingRegions.add(regionA.getRegionName());
+ if (regionB != null) {
+ mergingRegions.add(regionB.getRegionName());
+ }
+ }
+ if (mergingRegions.remove(info.getRegionName())) {
+ return true; // This region has been merged
+ } else if (!mergingRegions.isEmpty()) {
+ if (mergingRegions.higher(info.getRegionName()) == null) {
+ mergingRegions.clear();
+ }
+ }
if (daughterRegions.remove(info.getRegionName())) {
return true; //we have already processed this row
@@ -431,7 +451,6 @@
splitB.getRegionNameAsString() + " cannot be found in META.");
}
}
-
return processRowInternal(rowResult);
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy)
@@ -137,6 +137,8 @@
.getTableRegionsAndLocations(this.ct, tableName, true);
int countOfRegionsInTable = tableRegionsAndLocations.size();
List regions = regionsToAssignWithServerName(tableRegionsAndLocations);
+ List regionsInMerging = assignmentManager.getMergingRegions(regions);
+ regions.removeAll(regionsInMerging);
int regionsCount = regions.size();
if (regionsCount == 0) {
done = true;
@@ -266,7 +268,8 @@
}
private boolean isDone(final List regions) {
- return regions != null && regions.size() >= this.countOfRegionsInTable;
+ return regions != null && (regions.size() + assignmentManager
+ .getNumOfMergingRegionsForTable(tableNameStr)) >= this.countOfRegionsInTable;
}
}
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0)
@@ -0,0 +1,239 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.master.RegionMergeManager;
+import org.apache.hadoop.hbase.master.RegionMergeManager.MergeTransactionData;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility that can merge any two regions in the same table: adjacent,
+ * overlapping or disjoint. Different from {@link Merge}, it needn't shutdown
+ * cluster or disable table;
+ *
+ */
+public class OnlineMerge extends Configured implements Tool {
+ static final Log LOG = LogFactory.getLog(OnlineMerge.class);
+
+ private String tableName; // Name of table
+ private String region1EncodedName; // EncodedName of region 1
+ private String region2EncodedName; // EncodedName of region 1
+ private HRegionInfo region1Info; // Info of region 1
+ private HRegionInfo region2Info; // Info of region 1
+ private HRegionInfo mergedRegionInfo; // Info of merged region
+ private HTableDescriptor htd;
+ private boolean isForce = false;
+ private boolean sync = true;
+ private CatalogTracker catalogTracker;
+
+ /** default constructor */
+ public OnlineMerge() {
+ super();
+ }
+
+ /**
+ * @param conf configuration
+ */
+ public OnlineMerge(Configuration conf) {
+ setConf(conf);
+ }
+
+ private int parseArgs(String[] args) throws IOException {
+ GenericOptionsParser parser = new GenericOptionsParser(getConf(), args);
+
+ String[] remainingArgs = parser.getRemainingArgs();
+ if (remainingArgs.length != 3) {
+ usage();
+ return -1;
+ }
+ tableName = remainingArgs[0];
+ region1EncodedName = remainingArgs[1];
+ region2EncodedName = remainingArgs[2];
+ int status = 0;
+ if (region1EncodedName.equals(region2EncodedName)) {
+ LOG.error("Can't merge a region with itself");
+ status = -1;
+ }
+ return status;
+ }
+
+ private void usage() {
+ System.err.println("Usage: bin/hbase org.apache.hadoop.hbase.util.OnlineMerge"
+ + " [-force] [-async] [-show] "
+ + " \n");
+ }
+
+ @SuppressWarnings("deprecation")
+ public int run(String[] args) throws Exception {
+ if (parseArgs(args) != 0) {
+ return -1;
+ }
+ HBaseAdmin hbaseAdmin = new HBaseAdmin(getConf());
+ HConnection connection = HConnectionManager.getConnection(getConf());
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ try {
+ if (sync) {
+ this.htd = connection.getHTableDescriptor(Bytes.toBytes(tableName));
+ if (htd == null) {
+ System.err.println("Couldn't get table descriptor");
+ return -1;
+ }
+ this.catalogTracker = new CatalogTracker(zkw, getConf(), connection);
+ this.catalogTracker.start();
+ this.initRegionInfoByEncodedName(catalogTracker);
+ if (this.mergedRegionInfo == null) {
+ System.err.println("Couldn't get region info by encoded name");
+ return -1;
+ }
+ }
+ String transactionName = MergeTransactionData.generateTransactionName(
+ tableName, region1EncodedName, region2EncodedName, isForce);
+ System.out.println("Send merge request " + transactionName);
+ hbaseAdmin.mergeRegion(Bytes.toBytes(tableName),Bytes.toBytes(region1EncodedName),
+ Bytes.toBytes(region2EncodedName),isForce);
+ if (sync) {
+ String transactionNode = ZKUtil.joinZNode(zkw.mergeZNode, transactionName);
+ System.out.print("Waiting for completion...");
+ while (true) {
+ boolean existed = ZKUtil.watchAndCheckExists(zkw, transactionNode);
+ if (!existed) {
+ break;
+ } else {
+ System.out.print(".");
+ Thread.sleep(1000);
+ }
+ }
+ System.out.println("");
+ Pair mergedRegionLocation = MetaReader
+ .getRegion(catalogTracker, mergedRegionInfo.getRegionName());
+ if (mergedRegionLocation != null) {
+ System.out.println("Complete merging, new region ="
+ + mergedRegionInfo.getRegionNameAsString()
+ + ", assigned on the server " + mergedRegionLocation.getSecond());
+ } else {
+ System.err.println("Merge is cancelled, please see the logs on the master");
+ }
+ }
+ } finally {
+ if (this.catalogTracker != null) {
+ this.catalogTracker.stop();
+ }
+ connection.close();
+ }
+ return 0;
+ }
+
+ private void setForce(boolean force) {
+ this.isForce = force;
+ }
+
+ private void setSync(boolean sync) {
+ this.sync = sync;
+ }
+
+ private void initRegionInfoByEncodedName(CatalogTracker ct)
+ throws IOException {
+ if (tableName != null && (region1Info == null || region2Info == null)) {
+ List tableRegions = MetaReader.getTableRegions(ct,
+ Bytes.toBytes(tableName));
+ for (HRegionInfo regionInfo : tableRegions) {
+ if (regionInfo.getEncodedName().equals(region1EncodedName)) {
+ region1Info = regionInfo;
+ } else if (regionInfo.getEncodedName().equals(region2EncodedName)) {
+ region2Info = regionInfo;
+ }
+ if (region1Info != null && region2Info != null) {
+ this.mergedRegionInfo = HRegion.generateFixedRegionByMerge(this.htd,
+ this.region1Info, this.region2Info);
+ break;
+ }
+ }
+ }
+ }
+
+ static void showMergeRequests() throws Exception {
+ HConnection connection = HConnectionManager.getConnection(HBaseConfiguration.create());
+ try {
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ List mergingRequests = ZKUtil.listChildrenNoWatch(zkw,
+ zkw.mergeZNode);
+ System.out.println("Handling merge requests:");
+ for (String mergeRequest : mergingRequests) {
+ System.out.println(mergeRequest);
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
+ public static void main(String[] args) {
+ int status;
+ OnlineMerge onlineMerge = new OnlineMerge();
+ try {
+ while (true) {
+ if (args.length >= 1 && "-force".equals(args[0])) {
+ String[] newArgs = new String[args.length - 1];
+ for (int i = 1; i < args.length; i++) {
+ newArgs[i - 1] = args[i];
+ }
+ args = newArgs;
+ onlineMerge.setForce(true);
+ } else if (args.length >= 1 && "-async".equals(args[0])) {
+ String[] newArgs = new String[args.length - 1];
+ for (int i = 1; i < args.length; i++) {
+ newArgs[i - 1] = args[i];
+ }
+ args = newArgs;
+ onlineMerge.setSync(false);
+ } else if (args.length >= 1 && "-show".equals(args[0])) {
+ showMergeRequests();
+ System.exit(0);
+ } else {
+ break;
+ }
+ }
+ status = ToolRunner.run(HBaseConfiguration.create(), onlineMerge, args);
+ } catch (Exception e) {
+ LOG.error("exiting due to error", e);
+ status = -1;
+ }
+ System.exit(status);
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java (revision 1439767)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java (working copy)
@@ -36,13 +36,18 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,21 +56,21 @@
import org.apache.hadoop.hbase.util.PairOfSameType;
/**
- * A janitor for the catalog tables. Scans the .META. catalog
- * table on a period looking for unused regions to garbage collect.
+ * A janitor for the catalog tables. Scans the .META. catalog table
+ * on a period looking for unused regions to garbage collect.
*/
@InterfaceAudience.Private
class CatalogJanitor extends Chore {
- private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
+ private static final Log LOG = LogFactory.getLog(CatalogJanitor.class
+ .getName());
private final Server server;
private final MasterServices services;
private AtomicBoolean enabled = new AtomicBoolean(true);
private AtomicBoolean alreadyRunning = new AtomicBoolean(false);
CatalogJanitor(final Server server, final MasterServices services) {
- super(server.getServerName() + "-CatalogJanitor",
- server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000),
- server);
+ super(server.getServerName() + "-CatalogJanitor", server.getConfiguration()
+ .getInt("hbase.catalogjanitor.interval", 300000), server);
this.server = server;
this.services = services;
}
@@ -73,7 +78,8 @@
@Override
protected boolean initialChore() {
try {
- if (this.enabled.get()) scan();
+ if (this.enabled.get())
+ scan();
} catch (IOException e) {
LOG.warn("Failed initial scan of catalog table", e);
return false;
@@ -106,38 +112,118 @@
}
/**
- * Scans META and returns a number of scanned rows, and
- * an ordered map of split parents.
+ * Scans META and returns a number of scanned rows, and an ordered map of
+ * split parents.
+ * @return pair of scanned rows, and map of split parent regioninfos
+ * @throws IOException
*/
Pair> getSplitParents() throws IOException {
- // TODO: Only works with single .META. region currently. Fix.
+ Pair, Map>> scanPair =
+ getMergedRegionsAndSplitParents(null);
+ return new Pair>(scanPair.getFirst(),
+ scanPair.getSecond().getSecond());
+ }
+
+ /**
+ * Scans META and returns a number of scanned rows, and a map of merged
+ * regions,and an ordered map of split parents.
+ */
+ Pair, Map>>
+ getMergedRegionsAndSplitParents() throws IOException {
+ return getMergedRegionsAndSplitParents(null);
+ }
+
+ /**
+ * Scans META and returns a number of scanned rows, and a map of merged
+ * regions, and an ordered map of split parents. if the given table name is
+ * null, return merged regions and split parents of all tables, else only the
+ * specified table
+ * @param tableName null represents all tables
+ * @return pair of scanned rows, and map of merged regions, and map of split
+ * parent regioninfos
+ * @throws IOException
+ */
+ Pair, Map>>
+ getMergedRegionsAndSplitParents(final byte[] tableName) throws IOException {
+ final boolean isTableSpecified = (tableName != null && tableName.length != 0);
+ // TODO: Only works with single .META. region currently. Fix.
final AtomicInteger count = new AtomicInteger(0);
- // Keep Map of found split parents. There are candidates for cleanup.
+ // Keep Map of found split parents. There are candidates for cleanup.
// Use a comparator that has split parents come before its daughters.
- final Map splitParents =
- new TreeMap(new SplitParentFirstComparator());
+ final Map splitParents = new TreeMap(
+ new SplitParentFirstComparator());
+ final Map mergedRegions = new TreeMap();
// This visitor collects split parents and counts rows in the .META. table
MetaReader.Visitor visitor = new MetaReader.Visitor() {
@Override
public boolean visit(Result r) throws IOException {
- if (r == null || r.isEmpty()) return true;
+ if (r == null || r.isEmpty())
+ return true;
count.incrementAndGet();
HRegionInfo info = HRegionInfo.getHRegionInfo(r);
- if (info == null) return true; // Keep scanning
- if (info.isSplitParent()) splitParents.put(info, r);
+ if (info == null)
+ return true; // Keep scanning
+ if (isTableSpecified
+ && Bytes.compareTo(info.getTableName(), tableName) > 0) {
+ // Another table, stop scanning
+ return false;
+ }
+ if (info.isSplitParent())
+ splitParents.put(info, r);
+ if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
+ mergedRegions.put(info, r);
+ }
// Returning true means "keep scanning"
return true;
}
};
- // Run full scan of .META. catalog table passing in our custom visitor
- MetaReader.fullScan(this.server.getCatalogTracker(), visitor);
- return new Pair>(count.get(), splitParents);
+ byte[] startRow = (!isTableSpecified) ? HConstants.EMPTY_START_ROW
+ : HRegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW,
+ HConstants.ZEROES, false);
+ // Run full scan of .META. catalog table passing in our custom visitor with
+ // the start row
+ MetaReader.fullScan(this.server.getCatalogTracker(), visitor, startRow);
+
+ return new Pair, Map>>(
+ count.get(),
+ new Pair