### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (working copy) @@ -117,6 +117,10 @@ + regionInfo.getRegionNameAsString() + " but " + "this table is disabled, triggering close of region"); assignmentManager.unassign(regionInfo); + } else if (this.assignmentManager.isRegionInMerging(regionInfo)) { + LOG.debug("Opened region " + regionInfo.getRegionNameAsString() + + " but " + "this region is in merging, triggering close of region"); + assignmentManager.unassign(regionInfo); } } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeTransaction.java (revision 0) @@ -0,0 +1,528 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.master.CatalogJanitor.SplitParentFirstComparator; +import org.apache.hadoop.hbase.master.MergeManager.MergeTransactionData; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +/** + * Executes region merge as a "transaction". Call {@link #prepare()} to + * checkhether the transaction is available, {@link #offlineRegion()} to offline + * merging regions, {@link #executeMerging()} to merge files on fs, {@link + * #completeMerging(} and {@link #cancelMerging(} to remove useless data. + * + *

+ * This class is not thread safe. Caller needs ensure merge is run by one thread + * only. + */ +public class MergeTransaction { + private static final Log LOG = LogFactory.getLog(MergeTransaction.class); + + private final HTableDescriptor htd; + private final HRegionInfo merge_a; + private final HRegionInfo merge_b; + // Region C merged by regionA and regionB + private HRegionInfo merged_c; + private final MergeTransactionData transactionData; + private final MergeManager mergeManager; + + // Which state does this merge transaction start from + private final JournalState initialState; + // Current state of this merge transaction + protected JournalState currentState; + + private boolean successful = false; + + private final Configuration conf; + private final Path rootDir; + private final FileSystem fs; + + private final MonitoredTask status; + + /** + * Types to add to the transaction journal. Each enum is a state in the merge + * transaction. Used to redo after master restart + */ + enum JournalState { + /** + * Initial state when request a merge + */ + CREATE_MERGE, + /** + * Enter the offline region state after {@link #prepare()} + */ + OFFLINE_REGION, + /** + * Enter the state executing merge after {@link #offlineRegion()} + */ + EXECUTE_MERGING, + /** + * Enter the state completing merging after region merged + */ + COMPLETE_MERGING, + /** + * Enter the state canceling merging if merge transaction is unavailable + */ + CANCEL_MERGING, + /** + * State represent transaction is over + */ + END + } + + public MergeTransaction(final MasterServices masterServices, + final MergeManager mergeManager, + final MergeTransactionData transactionData, final HRegionInfo mergeA, + final HRegionInfo mergeB, final JournalState initialState, + final MonitoredTask status) throws FileNotFoundException, IOException { + this.conf = masterServices.getConfiguration(); + this.rootDir = masterServices.getMasterFileSystem().getRootDir(); + this.fs = masterServices.getMasterFileSystem().getFileSystem(); + this.mergeManager = mergeManager; + this.transactionData = transactionData; + this.htd = masterServices.getTableDescriptors().get( + transactionData.getTableName()); + this.merge_a = mergeA; + this.merge_b = mergeB; + this.initialState = initialState; + this.currentState = initialState; + this.status = status; + } + + /** + * Does checks for merge inputs. + * + * Set state to OFFLINE_REGION if the merge is available else set the state to + * CANCEL_MERGING if it is not(e.g. merge region is a parent or has + * reference). + * @throws IOException + * @throws KeeperException + * @throws NoNodeException + */ + void prepare() throws IOException, NoNodeException, KeeperException { + status.setStatus("Preparing..."); + boolean available = true; + if (merge_a.isSplit() || merge_a.isOffline()) { + available = false; + LOG.info("Region A " + merge_a.getRegionNameAsString() + + " is splitted or offline"); + } else if (merge_b.isSplit() || merge_b.isOffline()) { + available = false; + LOG.info("Region B " + merge_b.getRegionNameAsString() + + " is splitted or offline"); + } else if (containsReferenceFile(merge_a)) { + available = false; + LOG.info("Region A " + merge_a.getRegionNameAsString() + + " has reference , cancel the merge"); + } else if (containsReferenceFile(merge_b)) { + available = false; + LOG.info("Region B " + merge_b.getRegionNameAsString() + + " has reference , cancel the merge"); + } else if (!ensureNoParentRegion()) { + available = false; + LOG.info("Exist parent region and can't be cleared, cancel the merge"); + } + if (available) { + this.currentState = JournalState.OFFLINE_REGION; + } else { + this.currentState = JournalState.CANCEL_MERGING; + } + resetJournalStateOnZK(); + } + + /** + * Run the transaction according to the current state + * @param masterServices + * @throws IOException + * @throws KeeperException + * @throws InterruptedException + */ + public void execute(MasterServices masterServices) throws IOException, + KeeperException, InterruptedException { + while (!masterServices.isStopped()) { + if (this.currentState.equals(JournalState.END)) { + break; + } + switch (this.currentState) { + case CREATE_MERGE: + prepare(); + break; + case OFFLINE_REGION: + offlineRegion(); + break; + case EXECUTE_MERGING: + executeMerging(masterServices); + break; + case COMPLETE_MERGING: + completeMerging(masterServices); + break; + case CANCEL_MERGING: + cancelMerging(masterServices); + break; + default: + throw new RuntimeException("Unhandled merge state: " + currentState); + } + } + } + + /** + * Offline the two merging regions + * @throws KeeperException + * @throws InterruptedException + */ + void offlineRegion() throws KeeperException, InterruptedException { + status.setStatus("Offline region..."); + mergeManager.unassignMergingRegion(transactionData.getTransactionName(), + this.merge_a); + mergeManager.unassignMergingRegion(transactionData.getTransactionName(), + this.merge_b); + mergeManager.waitUntilRegionOffline(this.merge_a, this.merge_b); + if (mergeManager.isRegionSplit(this.merge_a) + || mergeManager.isRegionSplit(this.merge_b)) { + LOG.info("Canceling merge because region " + + this.merge_a.getEncodedName() + " or " + + this.merge_b.getEncodedName() + " has been splitted"); + this.currentState = JournalState.CANCEL_MERGING; + } else { + this.currentState = JournalState.EXECUTE_MERGING; + } + resetJournalStateOnZK(); + } + + /** + * Merging the files in HDFS, add merged region to META and assign it + * @param masterServices + * @throws NoNodeException + * @throws KeeperException + * @throws IOException + */ + void executeMerging(MasterServices masterServices) throws NoNodeException, + KeeperException, IOException { + status.setStatus("Merging region in fs..."); + this.merged_c = HRegion.generateFixedRegionByMerge(htd, merge_a, merge_b); + Pair mergedRegionInMeta = null; + boolean isMergedRegionSplit = false; + if (this.initialState == JournalState.EXECUTE_MERGING) { + // It is a redo from this step + mergedRegionInMeta = MetaReader.getRegion( + masterServices.getCatalogTracker(), merged_c.getRegionName()); + if (mergedRegionInMeta == null) { + isMergedRegionSplit = checkRegionSplit( + masterServices.getCatalogTracker(), merged_c); + LOG.debug("Redo merging " + transactionData + "from " + initialState + + "and check whether merged region" + + merged_c.getRegionNameAsString() + " is split =" + + isMergedRegionSplit); + } + } + if (!isMergedRegionSplit && mergedRegionInMeta == null) { + this.merged_c = HRegion.merge(fs, rootDir, htd, merge_a, merge_b); + MetaEditor.addRegionToMeta(masterServices.getCatalogTracker(), + this.merged_c); + } + status.setStatus("Assigning merged region " + + merged_c.getRegionNameAsString()); + if (!isMergedRegionSplit + && (mergedRegionInMeta == null || mergedRegionInMeta.getSecond() == null)) { + if (mergeManager.isRegionOffline(merged_c)) { + masterServices.getAssignmentManager().assign(this.merged_c, true); + } + } + this.currentState = JournalState.COMPLETE_MERGING; + resetJournalStateOnZK(); + } + + /** + * Delete merging regions from META after completing the merge successfully + * @param masterServices + * @throws KeeperException + * @throws IOException + * @throws InterruptedException + */ + void completeMerging(MasterServices masterServices) throws KeeperException, + IOException, InterruptedException { + status.setStatus("Completing merging... "); + // Remove region from .META. + if (this.merge_a != null) { + MetaEditor.deleteRegion(masterServices.getCatalogTracker(), this.merge_a); + } + if (this.merge_b != null) { + MetaEditor.deleteRegion(masterServices.getCatalogTracker(), this.merge_b); + } + this.mergeManager.waitUntilNoDeadServerInProcess(masterServices); + this.mergeManager.finishMergeTransaction(transactionData, true); + this.currentState = JournalState.END; + this.successful = true; + } + + /** + * Merge is unavailable, cancel it; Assign the merging region if it is offline + * because of merging + * @param masterServices + * @throws NoNodeException + * @throws KeeperException + * @throws InterruptedException + */ + void cancelMerging(MasterServices masterServices) throws NoNodeException, + KeeperException, InterruptedException { + status.setStatus("Canceling merging... "); + this.mergeManager.assignMergingRegionIfOffline(this.merge_a); + this.mergeManager.assignMergingRegionIfOffline(this.merge_b); + this.mergeManager.finishMergeTransaction(transactionData, true); + this.currentState = JournalState.END; + } + + protected void resetJournalStateOnZK() throws NoNodeException, + KeeperException { + mergeManager.setJournalStateOnZK(transactionData.getTransactionName(), + this.currentState); + } + + /** + * Check whether region has Reference file + * @param region + * @return + * @throws IOException + */ + private boolean containsReferenceFile(final HRegionInfo region) + throws IOException { + Path tabledir = new Path(rootDir, region.getTableNameAsString()); + boolean hasReference = false; + for (HColumnDescriptor family : htd.getFamilies()) { + Path p = HStore.getStoreHomedir(tabledir, region.getEncodedName(), + family.getName()); + if (!fs.exists(p)) + continue; + // Look for reference files. Call listStatus with anonymous instance of + // PathFilter. + FileStatus[] ps = FSUtils.listStatus(fs, p, new PathFilter() { + public boolean accept(Path path) { + return StoreFile.isReference(path); + } + }); + + if (ps != null && ps.length > 0) { + hasReference = true; + break; + } + } + return hasReference; + } + + /** + * Between offline region and start merging, make sure there is no parent + * region of the two regions in .META. If exists, fixing up daughters would + * cause daughter regions(we have merged one) online again when we restart + * master, so we clear the parent region in this function to prevent the above + * @return true if there are no parent region of merging regions + * @throws IOException + */ + private boolean ensureNoParentRegion() throws IOException { + Map splitParents = getSplitParents(merge_a + .getTableName()); + for (Map.Entry splitParent : splitParents.entrySet()) { + // check whether its range contains merged region + if (isRangeOverlap(merge_a, splitParent.getKey()) + || isRangeOverlap(merge_b, splitParent.getKey())) { + LOG.warn("Region " + merge_a.getEncodedName() + " or region " + + merge_b.getEncodedName() + " has parent region " + + splitParent.getKey().getRegionNameAsString() + + ", try to clean it"); + if (!this.mergeManager.cleanParent(splitParent.getKey(), + splitParent.getValue())) { + LOG.warn("Failed clean parent region " + + splitParent.getKey().getRegionNameAsString() + + ", canceling merge"); + return false; + } + } + } + + return true; + } + + /** + * Scans META and returns an ordered map of split parents for a specified + * table + * @param tableName + * @return + * @throws IOException + */ + Map getSplitParents(final byte[] tableName) + throws IOException { + // Keep Map of found split parents. There are candidates for cleanup. + // Use a comparator that has split parents come before its daughters. + final Map splitParents = new TreeMap( + new SplitParentFirstComparator()); + // This visitor collects split parents and counts rows in the .META. table + MetaScannerVisitor visitor = new MetaScannerVisitorBase() { + @Override + public boolean processRow(Result r) throws IOException { + if (r == null || r.isEmpty()) + return true; + HRegionInfo info = HRegionInfo.getHRegionInfo(r); + if (info == null) + return true; // Keep scanning + if (Bytes.compareTo(info.getTableName(), tableName) > 0) + return false; // Stop scanning + if (info.isSplitParent()) + splitParents.put(info, r); + // Returning true means "keep scanning" + return true; + } + }; + // Run full scan of .META. catalog table passing in our custom visitor + MetaScanner.metaScan(conf, visitor, tableName); + return splitParents; + } + + /** + * Check whether the region is split + * @param checkRegion + * @param result + * @return + * @throws IOException + */ + static boolean checkRegionSplit(final CatalogTracker catalogTracker, + final HRegionInfo checkRegion) throws IOException { + final AtomicBoolean isSplit = new AtomicBoolean(false); + byte[] startRow = checkRegion.getRegionName(); + final byte[] startKey = checkRegion.getStartKey(); + final byte[] endKey = checkRegion.getEndKey(); + final byte[] tableName = checkRegion.getTableName(); + MetaReader.fullScan(catalogTracker, new MetaReader.Visitor() { + @Override + public boolean visit(Result r) throws IOException { + if (r == null || r.isEmpty()) + return true; + HRegionInfo info = HRegionInfo.getHRegionInfo(r); + if (info == null) + return true; // Keep scanning + if (Bytes.compareTo(info.getTableName(), tableName) > 0) + return false;// other table,stop scanning + if (Bytes.compareTo(info.getTableName(), tableName) == 0) { + if (Bytes.compareTo(info.getStartKey(), startKey) == 0) { + if (Bytes.compareTo(info.getEndKey(), endKey) == 0) { + // find itself,keep scanning + return true; + } else { + isSplit.set(true); + return false; + } + } else if (Bytes.compareTo(info.getStartKey(), startKey) > 0) { + return false; + } + } + return true; + } + }, startRow); + return isSplit.get(); + } + + public boolean isSuccessful() { + return this.successful; + } + + public HRegionInfo getMergedRegion() { + return this.merged_c; + } + + public JournalState getCurrentState() { + return this.currentState; + } + + /** + * Check whether there is overlap between region A and region B + * @param regionA + * @param regionB + * @return true if regions' range exist overlap + */ + public static boolean isRangeOverlap(HRegionInfo regionA, HRegionInfo regionB) { + // if A's startKey < B's startkey, swap them + if (Bytes.compareTo(regionA.getStartKey(), regionB.getStartKey()) < 0) { + return isRangeOverlap(regionB, regionA); + } else { + if (Bytes.compareTo(regionA.getStartKey(), regionB.getEndKey()) < 0 + || Bytes.equals(regionB.getEndKey(), HConstants.EMPTY_END_ROW)) { + return true; + } else { + return false; + } + } + } + + /** + * Get the bytes for a specified JournalState + * @param state + * @return + */ + static byte[] getBytesFromJournalState(JournalState state) { + return Bytes.toBytes(state.toString()); + + } + + /** + * Get an JournalState from bytes. Throws a {@link RuntimeException} if the + * data is unknown bytes + * @param data + * @return + */ + static JournalState parseJournalStateFromBytes(byte[] data) { + String stateString = Bytes.toString(data); + for (JournalState state : JournalState.values()) { + if (state.toString().equals(stateString)) + return state; + } + throw new RuntimeException("Unknown JournalState " + stateString); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (working copy) @@ -102,6 +102,8 @@ protected final Server server; private ServerManager serverManager; + + private MergeManager mergeManager; private CatalogTracker catalogTracker; @@ -1024,6 +1026,11 @@ LOG.info("Opened region " + regionNameStr + "but this table is disabled, triggering close of region"); unassign(regionInfo); + } else if (isRegionInMerging(regionInfo)) { + LOG.debug("Opened region " + + regionInfo.getRegionNameAsString() + " but " + + "this region is in merging, triggering close of region"); + unassign(regionInfo); } } } @@ -2163,6 +2170,8 @@ Map allRegions = MetaReader.fullScan( catalogTracker, disabledOrDisablingOrEnabling, true); if (allRegions == null || allRegions.isEmpty()) return; + // Remove the merging regions + removeMergingRegions(allRegions); // Determine what type of assignment to do on startup boolean retainAssignment = server.getConfiguration(). @@ -2715,6 +2724,9 @@ public void handleSplitReport(final ServerName sn, final HRegionInfo parent, final HRegionInfo a, final HRegionInfo b) { regionOffline(parent); + if (this.mergeManager != null) { + this.mergeManager.notifyRegionSplit(parent); + } regionOnline(a, sn); regionOnline(b, sn); @@ -2796,4 +2808,75 @@ } return true; } + + void setMergeManager(MergeManager mergeManager) { + assert this.mergeManager == null; + this.mergeManager = mergeManager; + } + + public MergeManager getMergeManager() { + return this.mergeManager; + } + + public boolean isRegionInMerging(HRegionInfo regionInfo) { + return this.mergeManager != null + && this.mergeManager.isRegionInMerging(regionInfo); + } + + public void offlineMergingRegion(HRegionInfo regionInfo) { + // Region is merging so should not be reassigned, just delete the CLOSED + // node + LOG.debug("Region is in merging so deleting ZK node and removing from " + + "regions in transition, skipping assignment of region " + + regionInfo.getRegionNameAsString()); + try { + if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) { + // Could also be in OFFLINE mode + ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName()); + } + } catch (KeeperException.NoNodeException nne) { + LOG.debug("Tried to delete closed node for " + regionInfo + " but it " + + "does not exist so just offlining"); + } catch (KeeperException e) { + this.server.abort("Error deleting CLOSED node in ZK", e); + } + regionOffline(regionInfo); + if (this.mergeManager != null) { + this.mergeManager.notifyRegionOffline(regionInfo); + } + } + + /** + * Remove the merging regions from the specified map of regions + * @param regions + */ + private void removeMergingRegions(Map regions) { + if (this.mergeManager != null && this.mergeManager.isRegionInMerging()) { + List toRemoveRegions = new ArrayList(); + for (HRegionInfo region : regions.keySet()) { + if (this.mergeManager.isRegionInMerging(region)) { + toRemoveRegions.add(region); + } + } + for (HRegionInfo region : toRemoveRegions) + regions.remove(region); + } + } + + /** + * Get the merging region for the specified list of regions + * @param regions + * @return + */ + public List getMergingRegions(List regions) { + List regionsInMerging = new ArrayList(); + if (this.mergeManager != null && this.mergeManager.isRegionInMerging()) { + for (HRegionInfo region : regions) { + if (this.mergeManager.isRegionInMerging(region)) { + regionsInMerging.add(region); + } + } + } + return regionsInMerging; + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMergeTransaction.java (revision 0) @@ -0,0 +1,362 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.MergeManager.MergeTransactionData; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.base.Joiner; + +@Category(MediumTests.class) +public class TestMergeTransaction { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Log LOG = LogFactory.getLog(TestMergeTransaction.class); + private static final byte[] FAMILYNAME = Bytes.toBytes("fam"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int INITIAL_REGION_NUM = 4; + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 5; + private static final int rowSeperator2 = 10; + private static final int rowSeperator3 = 15; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + // Throw exception in certain step, simulating redo + private static int throwExceptionStep = 0; + + @BeforeClass + public static void beforeAllTests() throws Exception { + TEST_UTIL.getConfiguration().setClass( + "hbase.master.merge.transaction.impl", TestingMergeTransaction.class, + MergeTransaction.class); + TEST_UTIL.getConfiguration().setInt("hbase.master.merge.tries.number", 1); + // Start a cluster of two regionservers. + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void afterAllTests() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testWholesomeMerge() throws Exception { + System.out.println("Starting testWholesomeMerge"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + byte[] tablename = Bytes.toBytes("testWholesomeMerge"); + HTable table = createTableAndLoadData(master, tablename); + + // Merge 2nd and 3th region + mergeAndVerify(master, tablename, 1, 2, INITIAL_REGION_NUM - 1); + + // Merge 1st and 2nd region + mergeAndVerify(master, tablename, 0, 1, INITIAL_REGION_NUM - 2); + + verifyRowCount(table, ROWSIZE); + + table.close(); + } + + @Test + public void testRedoMergeWhenOfflineRegion() throws Exception { + System.out.println("Starting testRedoMergeFromOfflineRegion"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + byte[] tablename = Bytes.toBytes("testRedoMergeFromOfflineRegion"); + HTable table = createTableAndLoadData(master, tablename); + + // Throw exception when offline region + throwExceptionStep = 1; + requestMergeRegion(master, tablename, 0, 1); + waitAndVerifyResubmitQueueSize(master, 1); + + throwExceptionStep = 5; + master.assignmentManager.getMergeManager().resubmitAtOnce(); + waitAndVerifyResubmitQueueSize(master, 1); + + throwExceptionStep = 0; + master.assignmentManager.getMergeManager().resubmitAtOnce(); + waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1); + verifyRowCount(table, ROWSIZE); + + table.close(); + } + + @Test + public void testRedoMergeWhenExecuteMerge() throws Exception { + System.out.println("Starting testRedoMergeFromExecuteMerge"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + byte[] tablename = Bytes.toBytes("testRedoMergeFromExecuteMerge"); + HTable table = createTableAndLoadData(master, tablename); + + // Throw exception when execute merge + throwExceptionStep = 2; + requestMergeRegion(master, tablename, 0, 1); + waitAndVerifyResubmitQueueSize(master, 1); + + throwExceptionStep = 5; + master.assignmentManager.getMergeManager().resubmitAtOnce(); + waitAndVerifyResubmitQueueSize(master, 1); + + throwExceptionStep = 0; + master.assignmentManager.getMergeManager().resubmitAtOnce(); + waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1); + verifyRowCount(table, ROWSIZE); + table.close(); + } + + @Test + public void testRedoMergeWhenCompleteMerge() throws Exception { + System.out.println("Starting testRedoMergeFromCompleteMerge"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + byte[] tablename = Bytes.toBytes("testRedoMergeFromCompleteMerge"); + HTable table = createTableAndLoadData(master, tablename); + + // Throw exception when complete merge + throwExceptionStep = 3; + requestMergeRegion(master, tablename, 0, 1); + waitAndVerifyResubmitQueueSize(master, 1); + throwExceptionStep = 0; + master.assignmentManager.getMergeManager().resubmitAtOnce(); + waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM - 1); + verifyRowCount(table, ROWSIZE); + + table.close(); + } + + @Test + public void testRedoMergeWhenCancelMerge() throws Exception { + System.out.println("Starting testRedoMergeFromCancelMerge"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + byte[] tablename = Bytes.toBytes("testRedoMergeFromCancelMerge"); + HTable table = createTableAndLoadData(master, tablename); + + // Throw exception when complete merge + throwExceptionStep = 4; + requestMergeRegion(master, tablename, 0, 1); + waitAndVerifyResubmitQueueSize(master, 1); + throwExceptionStep = 0; + master.assignmentManager.getMergeManager().resubmitAtOnce(); + waitAndVerifyRegionNum(master, tablename, INITIAL_REGION_NUM); + verifyRowCount(table, ROWSIZE); + + table.close(); + } + + /** + * An MergeTransaction instance used in this test. + */ + public static class TestingMergeTransaction extends MergeTransaction { + + public TestingMergeTransaction(MasterServices masterServices, + MergeManager mergeManager, MergeTransactionData transactionData, + HRegionInfo mergeA, HRegionInfo mergeB, JournalState initialState, + MonitoredTask status) throws FileNotFoundException, IOException { + super(masterServices, mergeManager, transactionData, mergeA, mergeB, + initialState, status); + } + + @Override + void offlineRegion() throws KeeperException, InterruptedException { + throwExceptionIfTest(1); + super.offlineRegion(); + } + + @Override + void executeMerging(MasterServices masterServices) throws NoNodeException, + KeeperException, IOException { + throwExceptionIfTest(2); + super.executeMerging(masterServices); + } + + @Override + void completeMerging(MasterServices masterServices) throws KeeperException, + IOException, InterruptedException { + throwExceptionIfTest(3); + super.completeMerging(masterServices); + } + + @Override + void cancelMerging(MasterServices masterServices) throws NoNodeException, + KeeperException, InterruptedException { + throwExceptionIfTest(4); + super.cancelMerging(masterServices); + } + + @Override + void prepare() throws IOException, NoNodeException, KeeperException { + super.prepare(); + if (throwExceptionStep == 4) { + super.currentState = JournalState.CANCEL_MERGING; + super.resetJournalStateOnZK(); + } + } + + @Override + protected void resetJournalStateOnZK() throws NoNodeException, + KeeperException { + throwExceptionIfTest(5); + super.resetJournalStateOnZK(); + } + + void throwExceptionIfTest(int step) { + if(throwExceptionStep==step){ + throw new RuntimeException("Test"); + } + } + + } + + private HTable createTableAndLoadData(HMaster master, byte[] tablename) + throws Exception { + HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME); + TEST_UTIL.createMultiRegions(TEST_UTIL.getConfiguration(), table, FAMILYNAME, + new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],ROWS[rowSeperator2], ROWS[rowSeperator3] }); + loadData(table); + verifyRowCount(table, ROWSIZE); + + // sleep here is an ugly hack to allow region transitions to finish + long timeout = System.currentTimeMillis() + (15 * 1000); + List> tableRegions; + while (System.currentTimeMillis() < timeout) { + tableRegions = MetaReader.getTableRegionsAndLocations( + master.getCatalogTracker(), Bytes.toString(tablename)); + if (tableRegions.size() == 4) break; + Thread.sleep(250); + } + + + tableRegions = MetaReader.getTableRegionsAndLocations( + master.getCatalogTracker(), Bytes.toString(tablename)); + LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions)); + assertEquals(4, tableRegions.size()); + + return table; + } + + private void loadData(HTable table) throws IOException { + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.add(FAMILYNAME, QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + } + + private void verifyRowCount(HTable table, int expectedRegionNum) + throws IOException { + ResultScanner scanner = table.getScanner(new Scan()); + int rowCount = 0; + while (scanner.next() != null) { + rowCount++; + } + assertEquals(expectedRegionNum, rowCount); + scanner.close(); + } + + + private void mergeAndVerify(HMaster master, byte[] tablename, int regionAnum, + int regionBnum, int expectedRegionNum) throws Exception { + requestMergeRegion(master, tablename, regionAnum, regionBnum); + waitAndVerifyRegionNum(master, tablename, expectedRegionNum); + + } + + private void requestMergeRegion(HMaster master, byte[] tablename, + int regionAnum, int regionBnum) throws Exception { + List> tableRegions = MetaReader + .getTableRegionsAndLocations(master.getCatalogTracker(), + Bytes.toString(tablename)); + MergeManager.createMergeRequest(master.getZooKeeper(), + Bytes.toString(tablename), tableRegions.get(regionAnum).getFirst() + .getEncodedName(), tableRegions.get(regionBnum).getFirst() + .getEncodedName(), false); + } + + private void waitAndVerifyRegionNum(HMaster master, byte[] tablename, + int expectedRegionNum) throws Exception { + List> tableRegions; + long timeout = System.currentTimeMillis() + (15 * 1000); + while (System.currentTimeMillis() < timeout) { + tableRegions = MetaReader.getTableRegionsAndLocations( + master.getCatalogTracker(), Bytes.toString(tablename)); + if (tableRegions.size() == expectedRegionNum)break; + Thread.sleep(250); + } + + tableRegions = MetaReader.getTableRegionsAndLocations( + master.getCatalogTracker(), Bytes.toString(tablename)); + LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegions)); + assertEquals(expectedRegionNum, tableRegions.size()); + } + + private void waitAndVerifyResubmitQueueSize(HMaster master, int expectedSize) + throws Exception { + int queueSize; + long timeout = System.currentTimeMillis() + (15 * 1000); + while (System.currentTimeMillis() < timeout) { + queueSize = master.getAssignmentManager().getMergeManager().resubmitQueusSize(); + if (queueSize == expectedSize) break; + Thread.sleep(250); + } + + queueSize = master.getAssignmentManager().getMergeManager().resubmitQueusSize(); + assertEquals(expectedSize, queueSize); + } + + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i))); + } + return ret; + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (working copy) @@ -393,6 +393,11 @@ + " is disabled. Hence not assigning region" + hri.getEncodedName()); return false; } + if (assignmentManager.isRegionInMerging(hri)) { + LOG.info("Region " + hri.getRegionNameAsString() + + " is merging. Hence not assigning it"); + return false; + } return true; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -269,6 +269,9 @@ // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; + // manager of merge transactions + private MergeManager mergeManager; + // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting // operations/debugging. @@ -493,6 +496,7 @@ if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); + if (this.mergeManager != null) this.mergeManager.stop(); if (this.fileSystemManager != null) this.fileSystemManager.stop(); this.zooKeeper.close(); } @@ -541,6 +545,9 @@ this.catalogTracker, this.balancer, this.executorService, this.metricsMaster); zooKeeper.registerListenerFirst(assignmentManager); + this.mergeManager = new MergeManager(this, this.zooKeeper); + this.mergeManager.start(); + this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); this.regionServerTracker.start(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (working copy) @@ -137,6 +137,8 @@ .getTableRegionsAndLocations(this.ct, tableName, true); int countOfRegionsInTable = tableRegionsAndLocations.size(); List regions = regionsToAssignWithServerName(tableRegionsAndLocations); + List regionsInMerging = assignmentManager.getMergingRegions(regions); + regions.removeAll(regionsInMerging); int regionsCount = regions.size(); if (regionsCount == 0) { done = true; @@ -266,7 +268,9 @@ } private boolean isDone(final List regions) { - return regions != null && regions.size() >= this.countOfRegionsInTable; + return regions != null + && (regions.size() + assignmentManager.getMergeManager() + .getMergingRegionNumOfTable(tableNameStr)) >= this.countOfRegionsInTable; } } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0) @@ -0,0 +1,233 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.master.MergeManager; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Utility that can merge any two regions in the same table: adjacent, + * overlapping or disjoint. Different with {@link Merge}, it needn't to shutdown + * cluster or disable table; + * + */ +public class OnlineMerge extends Configured implements Tool { + static final Log LOG = LogFactory.getLog(OnlineMerge.class); + + private String tableName; // Name of table + private String region1EncodedName; // EncodedName of region 1 + private String region2EncodedName; // EncodedName of region 1 + private HRegionInfo region1Info; // Info of region 1 + private HRegionInfo region2Info; // Info of region 1 + private HRegionInfo mergedRegionInfo; // Info of merged region + private HTableDescriptor htd; + private boolean isForce = false; + private boolean sync = true; + private CatalogTracker catalogTracker; + + /** default constructor */ + public OnlineMerge() { + super(); + } + + /** + * @param conf configuration + */ + public OnlineMerge(Configuration conf) { + setConf(conf); + } + + private int parseArgs(String[] args) throws IOException { + GenericOptionsParser parser = new GenericOptionsParser(getConf(), args); + + String[] remainingArgs = parser.getRemainingArgs(); + if (remainingArgs.length != 3) { + usage(); + return -1; + } + tableName = remainingArgs[0]; + region1EncodedName = remainingArgs[1]; + region2EncodedName = remainingArgs[2]; + int status = 0; + if (region1EncodedName.equals(region2EncodedName)) { + LOG.error("Can't merge a region with itself"); + status = -1; + } + return status; + } + + private void usage() { + System.err + .println("Usage: bin/hbase org.apache.hadoop.hbase.util.OnlineMerge [-force] [-async] [-show] \n"); + } + + @SuppressWarnings("deprecation") + public int run(String[] args) throws Exception { + if (parseArgs(args) != 0) { + return -1; + } + HConnection connection = HConnectionManager.getConnection(getConf()); + ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + try { + if (sync) { + this.htd = connection.getHTableDescriptor(Bytes.toBytes(tableName)); + if (htd == null) { + System.out.println("Couldn't get table descriptor"); + return -1; + } + this.catalogTracker = new CatalogTracker(zkw, getConf(), connection); + this.catalogTracker.start(); + this.initRegionInfoByEncodedName(catalogTracker); + if (this.mergedRegionInfo == null) { + System.out.println("Couldn't get region info by encoded name"); + return -1; + } + } + String transactionName = MergeManager.MergeTransactionData.generateTransactionName(tableName, region1EncodedName, + region2EncodedName, isForce); + System.out.println("Send merge request " + transactionName); + MergeManager.createMergeRequest(zkw, tableName, region1EncodedName, + region2EncodedName, isForce); + if (sync) { + String transactionNode = ZKUtil.joinZNode(zkw.mergeZNode, + transactionName); + System.out.print("Waiting to complete..."); + while (true) { + boolean existed = ZKUtil.watchAndCheckExists(zkw, transactionNode); + if (!existed) { + break; + } else { + System.out.print("."); + Thread.sleep(1000); + } + } + System.out.println(""); + Pair mergedRegionLocation = MetaReader + .getRegion(catalogTracker, mergedRegionInfo.getRegionName()); + if (mergedRegionLocation != null) { + System.out.println("Complete merging, new region =" + + mergedRegionInfo.getRegionNameAsString() + + ", assigned on the server " + mergedRegionLocation.getSecond()); + } else { + System.out.println("Merge is cancelled, please see the logs on the master"); + } + } + } finally { + if (this.catalogTracker != null) { + this.catalogTracker.stop(); + } + connection.close(); + } + return 0; + } + + private void setForce(boolean force) { + this.isForce = force; + } + + private void setSync(boolean sync) { + this.sync = sync; + } + + void initRegionInfoByEncodedName(CatalogTracker ct) + throws IOException { + if (tableName != null && (region1Info == null || region2Info == null)) { + List tableRegions = MetaReader.getTableRegions(ct, + Bytes.toBytes(tableName)); + for (HRegionInfo regionInfo : tableRegions) { + if (regionInfo.getEncodedName().equals(region1EncodedName)) { + region1Info = regionInfo; + } else if (regionInfo.getEncodedName().equals(region2EncodedName)) { + region2Info = regionInfo; + } + if (region1Info != null && region2Info != null) { + this.mergedRegionInfo = HRegion.generateFixedRegionByMerge(this.htd, + this.region1Info, this.region2Info); + break; + } + } + } + } + + static void showMergeRequests() throws Exception { + HConnection connection = HConnectionManager.getConnection(HBaseConfiguration.create()); + ZooKeeperWatcher zkw = connection.getZooKeeperWatcher(); + List mergingRequests = ZKUtil.listChildrenNoWatch(zkw, + zkw.mergeZNode); + System.out.println("Handling merge requests:"); + for (String mergeRequest : mergingRequests) { + System.out.println(mergeRequest); + } + connection.close(); + } + + public static void main(String[] args) { + int status; + OnlineMerge onlineMerge = new OnlineMerge(); + try { + while (true) { + if (args.length >= 1 && "-force".equals(args[0])) { + String[] newArgs = new String[args.length - 1]; + for (int i = 1; i < args.length; i++) { + newArgs[i - 1] = args[i]; + } + args = newArgs; + onlineMerge.setForce(true); + } else if (args.length >= 1 && "-async".equals(args[0])) { + String[] newArgs = new String[args.length - 1]; + for (int i = 1; i < args.length; i++) { + newArgs[i - 1] = args[i]; + } + args = newArgs; + onlineMerge.setSync(false); + } else if (args.length >= 1 && "-show".equals(args[0])) { + showMergeRequests(); + System.exit(0); + } else { + break; + } + } + status = ToolRunner.run(HBaseConfiguration.create(), onlineMerge, args); + } catch (Exception e) { + LOG.error("exiting due to error", e); + status = -1; + } + System.exit(status); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -4223,6 +4223,123 @@ return dstRegion; } + /** + * Generate a fixed region info when merging + * @param tabledesc + * @param a + * @param b + * @return + * @throws IOException + */ + public static HRegionInfo generateFixedRegionByMerge( + final HTableDescriptor tabledesc, final HRegionInfo a, final HRegionInfo b) + throws IOException { + if (!a.getTableNameAsString().equals(b.getTableNameAsString())) { + throw new IOException("Regions do not belong to the same table"); + } + // Presume both are of same region type -- i.e. both user or catalog + // table regions. This way can use comparator. + final byte[] startKey = (a.getComparator().matchingRows(a.getStartKey(), 0, + a.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0, + HConstants.EMPTY_BYTE_ARRAY.length) || b.getComparator().matchingRows( + b.getStartKey(), 0, b.getStartKey().length, + HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)) ? HConstants.EMPTY_BYTE_ARRAY + : (a.getComparator().compareRows(a.getStartKey(), 0, + a.getStartKey().length, b.getStartKey(), 0, b.getStartKey().length) <= 0 ? a + .getStartKey() : b.getStartKey()); + final byte[] endKey = (a.getComparator().matchingRows(a.getEndKey(), 0, + a.getEndKey().length, HConstants.EMPTY_BYTE_ARRAY, 0, + HConstants.EMPTY_BYTE_ARRAY.length) || a.getComparator().matchingRows( + b.getEndKey(), 0, b.getEndKey().length, HConstants.EMPTY_BYTE_ARRAY, 0, + HConstants.EMPTY_BYTE_ARRAY.length)) ? HConstants.EMPTY_BYTE_ARRAY : (a + .getComparator().compareRows(a.getEndKey(), 0, a.getEndKey().length, + b.getEndKey(), 0, b.getEndKey().length) <= 0 ? b.getEndKey() : a + .getEndKey()); + + HRegionInfo newRegionInfo = new HRegionInfo(tabledesc.getName(), startKey, + endKey, false, Math.max(a.getRegionId(), b.getRegionId()) + 1); + return newRegionInfo; + } + + /** + * Merge two regions whether they are adjacent or not. + * @param conf + * @param tabledesc + * @param a + * @param b + * @return + * @throws IOException + */ + public static HRegionInfo merge(final FileSystem fs, final Path rootDir, + final HTableDescriptor tabledesc, final HRegionInfo a, final HRegionInfo b) + throws IOException { + if (!a.getTableNameAsString().equals(b.getTableNameAsString())) { + throw new IOException("Regions do not belong to the same table"); + } + + Path tableDir = HTableDescriptor.getTableDir(rootDir, a.getTableName()); + HRegionInfo newRegionInfo = generateFixedRegionByMerge(tabledesc, a, b); + String newRegionEncodedName = newRegionInfo.getEncodedName(); + Path newRegionDir = HRegion.getRegionDir(tableDir, newRegionEncodedName); + if (!fs.exists(newRegionDir)) { + fs.mkdirs(newRegionDir); + } + + LOG.info("Starting merge of regions on fs : " + a.getRegionNameAsString() + + " and " + b.getRegionNameAsString() + " into new region " + + newRegionInfo.getRegionNameAsString() + " with start key <" + + Bytes.toStringBinary(newRegionInfo.getStartKey()) + "> and end key <" + + Bytes.toStringBinary(newRegionInfo.getEndKey()) + ">"); + + // Move HStoreFiles under new region directory + Map> byFamily = new TreeMap>( + Bytes.BYTES_COMPARATOR); + HRegionInfo[] regionInfos = new HRegionInfo[] { a, b }; + for (HRegionInfo hri : regionInfos) { + for (HColumnDescriptor family : tabledesc.getFamilies()) { + Path familyDir = HStore.getStoreHomedir(tableDir, hri.getEncodedName(), + family.getName()); + FileStatus files[] = fs.listStatus(familyDir); + if (files == null || files.length == 0) { + continue; + } else { + List familyFileList = byFamily.get(family.getName()); + if (familyFileList == null) { + familyFileList = new ArrayList(); + byFamily.put(family.getName(), familyFileList); + } + for (int i = 0; i < files.length; i++) { + if (StoreFile.isReference(files[i].getPath())) { + throw new IOException("Existed reference file " + + files[i].getPath()); + } + familyFileList.add(files[i].getPath()); + } + } + } + } + + for (Map.Entry> es : byFamily.entrySet()) { + byte[] colFamily = es.getKey(); + makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily); + // Because we compacted the source regions we should have no more than two + // HStoreFiles per family and there will be no reference store + List srcFiles = es.getValue(); + for (Path hsf : srcFiles) { + fs.rename(hsf, StoreFile.getUniqueFile(fs, HStore.getStoreHomedir( + tableDir, newRegionInfo.getEncodedName(), colFamily))); + } + } + // delete out the 'A' region + HFileArchiver.archiveRegion(fs, a); + // delete out the 'B' region + HFileArchiver.archiveRegion(fs, b); + + LOG.info("Merge completed on the fs. New region is " + + newRegionInfo.getRegionNameAsString()); + return newRegionInfo; + } + /* * Fills a map with a vector of store files keyed by column family. * @param byFamily Map to fill. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeManager.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeManager.java (revision 0) @@ -0,0 +1,721 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.master.MergeTransaction.JournalState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKTable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Receive merge request via zk, manage merge transaction and merging region + * list + */ +public class MergeManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(MergeManager.class); + + private MasterServices master; + private final AssignmentManager assignmentManager; + private final CatalogJanitor catalogJanitor; + private final Configuration conf; + + // A map used to store the regions which are merging; A region in merging + // means it's disabled and won't online anywhere + private final ConcurrentHashMap mergingRegions; + // Map of region which are already split; Only add to this set if it is in + // mergingRegionSet + private final ConcurrentHashMap splitRegions; + + // Map of merge transaction in processing + private final ConcurrentHashMap processingMergeTransaction; + + // Queue of merge transactions which are waiting for resubmit + private final BlockingQueue resubmitQueue; + + /** Resubmit thread schedule pool */ + private ScheduledExecutorService resubmitSchedulePool; + // 60s + private final int resubmitThreadPeriod = 60; + + // Used for maps' value and wait signal + private final Object PRESENT_OBJECT = new Object(); + + private final int timePerSleep = 10; + // 120s + private static final int DEFAULT_TIME_OUT = 120000; + private final int timeout; + /** + * If it is the first time we called {@link MergeManager#fetchMergeRequest()}, + * we should make sure old merge transactions handled first + */ + private boolean isFirstFetchRequest = true; + + // Thread pool used to run merge request + private ExecutorService mergesPool; + private final int mergeThreads; + + /** + * Constructs a new merge manager. + * @param master + * @param catalogJanitor + * @param watcher + * @throws KeeperException + * @throws IOException + */ + public MergeManager(final MasterServices master, ZooKeeperWatcher watcher) + throws KeeperException, IOException { + super(watcher); + this.master = master; + this.conf = master.getConfiguration(); + this.timeout = conf.getInt("hbase.master.merge.wait.timeout", + DEFAULT_TIME_OUT); + this.catalogJanitor = new CatalogJanitor(master, master); + this.assignmentManager = master.getAssignmentManager(); + this.mergingRegions = new ConcurrentHashMap(); + this.splitRegions = new ConcurrentHashMap(); + this.processingMergeTransaction = new ConcurrentHashMap(); + this.resubmitQueue = new LinkedBlockingQueue(); + this.mergeThreads = master.getConfiguration().getInt( + "hbase.master.thread.merge", 1); + this.assignmentManager.setMergeManager(this); + + } + + /** + * Submit the merge request to the pool + * @param transactionData + * @param initialState + * @throws IOException + */ + private void requestMerge(MergeTransactionData transactionData, + JournalState initialState) { + try { + this.mergesPool.execute(new MergeRequest(this, transactionData, + initialState, master)); + if (LOG.isDebugEnabled()) { + LOG.debug("Merge requested for " + transactionData + ",initialState= " + + initialState); + } + } catch (RejectedExecutionException ree) { + LOG.info("Could not execute merge for " + transactionData, ree); + } + } + + /** + * Handle the merge transaction data from ZK + * @param transactionData + * @throws KeeperException + * @throws IOException + */ + synchronized private void handleMergeTransactionData( + MergeTransactionData transactionData) throws KeeperException { + String regionA = transactionData.getRegionA(); + String regionB = transactionData.getRegionB(); + if (mergingRegions.containsKey(regionA) + || mergingRegions.containsKey(regionB)) { + LOG.warn("Merge transaction " + transactionData + + " try to merge region where other transaction contains"); + finishMergeTransaction(transactionData, false); + } else if (regionA.equals(regionB)) { + LOG.warn("Error merge request:" + transactionData + + ",shouldn't merge the same region"); + finishMergeTransaction(transactionData, false); + } else { + String transactionNode = ZKUtil.joinZNode(watcher.mergeZNode, + transactionData.getTransactionName()); + byte[] data = ZKUtil.getData(watcher, transactionNode); + JournalState initialState = MergeTransaction + .parseJournalStateFromBytes(data); + mergingRegions.put(regionA, PRESENT_OBJECT); + mergingRegions.put(regionB, PRESENT_OBJECT); + requestMerge(transactionData, initialState); + } + } + + HRegionInfo getRegionInfoByEncodedName(String tableName, + String regionEncodeName) throws IOException { + RegionState regionState = this.assignmentManager.getRegionStates() + .getRegionState(regionEncodeName); + if (regionState != null) { + return regionState.getRegion(); + } + List tableRegions = MetaReader.getTableRegions( + this.master.getCatalogTracker(), Bytes.toBytes(tableName)); + for (HRegionInfo regionInfo : tableRegions) { + if (regionInfo.getEncodedName().equals(regionEncodeName)) { + return regionInfo; + } + } + return null; + } + + /** + * Starts the thread pool and the tracking of merge request. + * + * @throws KeeperException + * @throws IOException + */ + public void start() throws KeeperException, IOException { + this.mergesPool = Executors.newFixedThreadPool( + mergeThreads, + new ThreadFactoryBuilder() + .setNameFormat("MergeManager-Merge-Thread #%d").setDaemon(true) + .build()); + + this.resubmitSchedulePool = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setNameFormat("MergeManager-Resubmit-Thread #%d").setDaemon(true) + .build()); + resubmitSchedulePool.scheduleAtFixedRate(new ResubmitThread(this, + this.resubmitQueue), resubmitThreadPeriod, resubmitThreadPeriod, + TimeUnit.SECONDS); + watcher.registerListener(this); + fetchMergeRequest(); + } + + /** + * Shutdown the threadpool + */ + public void stop() { + if (mergesPool != null) { + this.mergesPool.shutdown(); + } + if (this.resubmitSchedulePool != null) { + this.resubmitSchedulePool.shutdown(); + } + } + + /** + * Thread used to resubmit failed merge transaction + */ + static class ResubmitThread extends Thread { + MergeManager mergeManager; + BlockingQueue resubmitQueue; + + public ResubmitThread(MergeManager mergeManager, + BlockingQueue resubmitQueue) { + super("MergeManager.ResubmitThread"); + this.mergeManager = mergeManager; + this.resubmitQueue = resubmitQueue; + setDaemon(true); + } + + @Override + public void run() { + MergeTransactionData transactionData = null; + try { + while ((transactionData = resubmitQueue.poll()) != null) { + String transactionNode = ZKUtil.joinZNode( + mergeManager.watcher.mergeZNode, + transactionData.getTransactionName()); + byte[] data = ZKUtil.getData(mergeManager.watcher, transactionNode); + JournalState initialState = MergeTransaction + .parseJournalStateFromBytes(data); + mergeManager.requestMerge(transactionData, initialState); + } + } catch (KeeperException e) { + LOG.warn("Failed resubmit transaction " + transactionData, e); + if (transactionData != null) { + mergeManager.addToResubmitQueue(transactionData); + } + } + } + } + + /** + * Fetch the merge request via zk + * @throws KeeperException + * @throws IOException + */ + synchronized private void fetchMergeRequest() throws KeeperException { + List mergeTransactions = ZKUtil.listChildrenAndWatchForNewChildren( + watcher, watcher.mergeZNode); + if (isFirstFetchRequest) { + isFirstFetchRequest = false; + List oldMergeTransactions = new ArrayList(); + List newMergeTransactions = new ArrayList(); + for (String transactionName : mergeTransactions) { + String transactionNode = ZKUtil.joinZNode(watcher.mergeZNode, + transactionName); + byte[] data = ZKUtil.getData(watcher, transactionNode); + JournalState initialState = MergeTransaction + .parseJournalStateFromBytes(data); + if (initialState != JournalState.CREATE_MERGE) { + oldMergeTransactions.add(transactionName); + } else { + newMergeTransactions.add(transactionName); + } + } + mergeTransactions.clear(); + mergeTransactions.addAll(oldMergeTransactions); + mergeTransactions.addAll(newMergeTransactions); + } + for (String transactionName : mergeTransactions) { + if (!MergeTransactionData.isValidTransactionName(transactionName)) { + LOG.warn("Invalid merge transaction name=" + transactionName + + ", clean it"); + ZKUtil.deleteNodeRecursively(watcher, + ZKUtil.joinZNode(watcher.mergeZNode, transactionName)); + } else if (this.processingMergeTransaction.containsKey(transactionName)) { + continue; + } else { + MergeTransactionData transactionData = new MergeTransactionData( + transactionName); + this.processingMergeTransaction.put(transactionName, transactionData); + handleMergeTransactionData(transactionData); + } + } + } + + /** + * Finish the merge transaction: 1.delete the transaction zk node; 2.remove + * the region in set if it is called after we have submit the request + * @param transactionData + * @param afterSubmit true if it is called by the MergeRequest thread + * @throws KeeperException + */ + void finishMergeTransaction(MergeTransactionData transactionData, + boolean afterSubmit) throws KeeperException { + ZKUtil.deleteNodeRecursively( + watcher, + ZKUtil.joinZNode(watcher.mergeZNode, + transactionData.getTransactionName())); + this.processingMergeTransaction + .remove(transactionData.getTransactionName()); + if (afterSubmit) { + String regionA = transactionData.getRegionA(); + String regionB = transactionData.getRegionB(); + this.mergingRegions.remove(regionA); + this.mergingRegions.remove(regionB); + this.splitRegions.remove(regionA); + this.splitRegions.remove(regionB); + } + } + + /** + * Add the resubmit queue, and wait for resubmit merge request after an + * exception + * @param transactionData + */ + void addToResubmitQueue(MergeTransactionData transactionData) { + this.resubmitQueue.add(transactionData); + } + + /** + * Used in test + * @return + */ + int resubmitQueusSize() { + return this.resubmitQueue.size(); + } + + /** + * Used in test + */ + void resubmitAtOnce() { + new ResubmitThread(this, this.resubmitQueue).run(); + } + + /** + * Unassign the merging region + * @param transactionName + * @param regionInfo + */ + void unassignMergingRegion(String transactionName, HRegionInfo regionInfo) { + assignmentManager.unassign(regionInfo); + } + + /** + * Assign the merging region if it is offline, called when cancel the merge + * @param regionInfo + */ + void assignMergingRegionIfOffline(HRegionInfo regionInfo) { + String tableName = regionInfo.getTableNameAsString(); + ZKTable zkTable = assignmentManager.getZKTable(); + if (zkTable.isTablePresent(tableName) + && !zkTable.isDisablingOrDisabledTable(tableName)) { + if (isRegionOffline(regionInfo) && !isRegionSplit(regionInfo)) { + assignmentManager.assign(regionInfo, true); + } + } + this.mergingRegions.remove(regionInfo.getEncodedName()); + } + + /** + * Assign the merged region if it is offline, called after add merged region + * to META + * @param regionInfo + * @throws InterruptedException + */ + void assignMergedRegionIfOffline(HRegionInfo mergedRegionInfo) + throws InterruptedException { + String tableName = mergedRegionInfo.getTableNameAsString(); + ZKTable zkTable = assignmentManager.getZKTable(); + waitUntilTableNotInEnabling(zkTable, tableName); + if (zkTable.isTablePresent(tableName) + && !zkTable.isDisablingOrDisabledTable(tableName)) { + if (isRegionOffline(mergedRegionInfo)) { + assignmentManager.assign(mergedRegionInfo, true); + } + } + } + + /** + * Wait until two regions are offline + * @param regionA + * @param regionB + * @throws InterruptedException + */ + void waitUntilRegionOffline(HRegionInfo regionA, HRegionInfo regionB) + throws InterruptedException { + long expireTime = System.currentTimeMillis() + this.timeout; + while (!this.master.isStopped() && System.currentTimeMillis() < expireTime) { + if (isRegionOffline(regionA) && isRegionOffline(regionB)) { + return; + } else { + synchronized (PRESENT_OBJECT) { + PRESENT_OBJECT.wait(timePerSleep); + } + } + } + throw new RuntimeException("Timeout waiting region offline"); + } + + /** + * Wait until master is initialized + * @throws InterruptedException + */ + void waitUntilMasterInitialized() throws InterruptedException { + while (!this.master.isStopped()) { + if (((HMaster) this.master).isInitialized()) { + return; + } else { + Thread.sleep(timePerSleep); + } + } + throw new RuntimeException("Timeout waiting master initialized"); + } + + /** + * Wait until no dead server in process + * @param masterServices + * @throws InterruptedException + */ + void waitUntilNoDeadServerInProcess(MasterServices masterServices) + throws InterruptedException { + while (!this.master.isStopped()) { + if (!masterServices.getServerManager().areDeadServersInProgress()) { + return; + } else { + Thread.sleep(timePerSleep); + } + } + throw new RuntimeException("Timeout waiting no deadserver in process"); + } + + /** + * Wait until specified table not in enabling state + * @param zkTable + * @param tableName + * @throws InterruptedException + */ + void waitUntilTableNotInEnabling(ZKTable zkTable, String tableName) + throws InterruptedException { + long expireTime = System.currentTimeMillis() + this.timeout; + while (!this.master.isStopped() && System.currentTimeMillis() < expireTime) { + if (!zkTable.isEnablingTable(tableName)) { + return; + } else { + Thread.sleep(timePerSleep); + } + } + throw new RuntimeException("Timeout waiting table not in enabling"); + } + + /** + * Check if the merging region is split + * @param regionInfo + * @return + */ + boolean isRegionSplit(HRegionInfo regionInfo) { + if (regionInfo.isSplitParent()) { + return true; + } + if (this.splitRegions.containsKey(regionInfo.getEncodedName())) { + return true; + } + return false; + } + + /** + * Check if the merging region is offline + * @param regionInfo + * @return + */ + boolean isRegionOffline(HRegionInfo regionInfo) { + if (!this.assignmentManager.getRegionStates().isRegionInTransition(regionInfo) + && this.assignmentManager.getRegionStates().getRegionServerOfRegion(regionInfo) == null) { + return true; + } + return false; + } + + /** + * Record the transaction's JournalState on the ZK + * @param transactionName + * @param state + * @throws NoNodeException + * @throws KeeperException + */ + void setJournalStateOnZK(String transactionName, JournalState state) + throws NoNodeException, KeeperException { + String transactionNode = ZKUtil.joinZNode(watcher.mergeZNode, + transactionName); + ZKUtil.setData(watcher, transactionNode, + MergeTransaction.getBytesFromJournalState(state)); + } + + /** + * Clear parent region of merging regions in prepare step + * @param parent + * @param rowContent + * @return + * @throws IOException + */ + boolean cleanParent(final HRegionInfo parent, Result rowContent) + throws IOException { + return this.catalogJanitor.cleanParent(parent, rowContent); + } + + /** + * Check whether a region is in merging, if true it shouldn't be assigned + * @param regionInfo + * @return true if region in the merging + */ + boolean isRegionInMerging(final HRegionInfo regionInfo) { + return this.mergingRegions.containsKey(regionInfo.getEncodedName()); + } + + /** + * Check if any regions are merging + * @return true if any regions in merging + */ + boolean isRegionInMerging() { + return !this.mergingRegions.isEmpty(); + } + + public Set getMergingRegions() { + Set clone = new HashSet(this.mergingRegions.size()); + clone.addAll(this.mergingRegions.keySet()); + return clone; + } + + /** + * Get the merging region num for the specified table + * @param tableName + * @return + */ + public int getMergingRegionNumOfTable(String tableName) { + int num = 0; + for (MergeTransactionData transactionData : processingMergeTransaction + .values()) { + if (tableName.equals(transactionData.getTableName())) { + num += 2; + } + } + return num; + } + + /** + * AssignmentManager notify MergeManager one region is offline + * @param regionInfo + */ + void notifyRegionOffline(final HRegionInfo regionInfo) { + if (this.mergingRegions.containsKey(regionInfo.getEncodedName())) { + synchronized (PRESENT_OBJECT) { + PRESENT_OBJECT.notifyAll(); + } + } + } + + /** + * AssignmentManager notify MergeManager one region is split + * @param regionInfo + */ + void notifyRegionSplit(final HRegionInfo regionInfo) { + if (this.mergingRegions.containsKey(regionInfo.getEncodedName())) { + this.splitRegions.put(regionInfo.getEncodedName(), PRESENT_OBJECT); + synchronized (PRESENT_OBJECT) { + PRESENT_OBJECT.notifyAll(); + } + } + } + + // ZooKeeper events + + /** + * New transaction node has been created or deleted. + *

+ * When this happens we must: + *

    + *
  1. Find the new merge request and handle it
  2. + *
+ */ + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.mergeZNode)) { + try { + LOG.debug("Merge reuest arrived or completed "); + fetchMergeRequest(); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception handling merge requests", e); + } + } + } + + public static class MergeTransactionData { + public static final String SEPARATOR = "#"; + private final String transactionName; + private final String tableName; + private final String regionA; + private final String regionB; + private final boolean isForce; + + MergeTransactionData(String transactionName) { + this.transactionName = transactionName; + String[] elements = transactionName.split(SEPARATOR); + this.tableName = elements[0]; + this.regionA = elements[1]; + this.regionB = elements[2]; + this.isForce = Boolean.parseBoolean(elements[3]); + } + + String getTransactionName() { + return this.transactionName; + } + + String getTableName() { + return this.tableName; + } + + String getRegionA() { + return this.regionA; + } + + String getRegionB() { + return this.regionB; + } + + boolean isForce() { + return this.isForce; + } + + @Override + public String toString() { + return "table=" + tableName + ", regionA=" + regionA + ", regionB=" + + regionB + ", force=" + isForce; + } + + /** + * Valid TransactionName=tableName+'#'+AEncodedName+"#"+BEncodedName+"#"+ + * "true/false" + * @param transactionName + * @return + */ + public static boolean isValidTransactionName(String transactionName) { + String[] elements = transactionName.split(SEPARATOR); + if (elements.length != 4) { + return false; + } + if (elements[1].length() != elements[2].length()) { + return false; + } + if (elements[1].equals(elements[2])) { + return false; + } + if (!elements[3].equals("true") && !elements[3].equals("false")) { + return false; + } + return true; + } + + public static String generateTransactionName(String tableName, + String regionAEncodedName, String regionBEncodedName, boolean force) { + return tableName + SEPARATOR + regionAEncodedName + SEPARATOR + + regionBEncodedName + SEPARATOR + force; + } + } + + /** + * Create a merge request via zk.Set force true if merge two non-adjacent + * regions, else the request will be skipped + * @param zkw + * @param tableName + * @param regionAEncodedName + * @param regionBEncodedName + * @param force if it is false, only merge two adjacent regions + * @throws KeeperException + */ + public static void createMergeRequest(ZooKeeperWatcher zkw, String tableName, + String regionAEncodedName, String regionBEncodedName, boolean force) + throws KeeperException { + String transactionName = MergeTransactionData.generateTransactionName( + tableName, regionAEncodedName, regionBEncodedName, force); + if (MergeTransactionData.isValidTransactionName(transactionName)) { + String transactionNode = ZKUtil + .joinZNode(zkw.mergeZNode, transactionName); + ZKUtil.createAndNoWatch(zkw, transactionNode, + MergeTransaction.getBytesFromJournalState(JournalState.CREATE_MERGE)); + } else { + throw new IllegalArgumentException("Unknown merge parameters, table=" + + tableName + ", regionA=" + regionAEncodedName + ", regionB=" + + regionBEncodedName); + } + + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (working copy) @@ -92,6 +92,11 @@ @Override public void process() { LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName()); + // Check if thie region is in merging or not + if (this.assignmentManager.isRegionInMerging(this.regionInfo)) { + assignmentManager.offlineMergingRegion(regionInfo); + return; + } // Check if this table is being disabled or not if (this.assignmentManager.getZKTable(). isDisablingOrDisabledTable(this.regionInfo.getTableNameAsString())) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -103,6 +103,8 @@ public String splitLogZNode; // znode containing the state of the load balancer public String balancerZNode; + // znode used for merge + public String mergeZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -166,6 +168,7 @@ ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); + ZKUtil.createAndFailSilent(this, mergeZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -215,6 +218,8 @@ conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); balancerZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.balancer", "balancer")); + mergeZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.merge", "merge")); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1424296) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -1043,6 +1043,35 @@ } /** + * Creates the specified node with the specified data . Does not set watch + * + *

+ * Throws an exception if the node already exists. + * + *

+ * The node created is persistent and open access. + * + *

+ * Returns the version number of the created node if successful. + * + * @param zkw zk reference + * @param znode path of node to create + * @param data data of node to create + * @throws KeeperException if unexpected zookeeper exception + * @throws KeeperException.NodeExistsException if node already exists + */ + public static void createAndNoWatch(ZooKeeperWatcher zkw, String znode, + byte[] data) throws KeeperException, KeeperException.NodeExistsException { + try { + waitForZKConnectionIfAuthenticating(zkw); + zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode), + CreateMode.PERSISTENT); + } catch (InterruptedException e) { + zkw.interruptedException(e); + } + } + + /** * Async creates the specified node with the specified data. * *

Throws an exception if the node already exists. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeRequest.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeRequest.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MergeRequest.java (revision 0) @@ -0,0 +1,238 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.master.MergeManager.MergeTransactionData; +import org.apache.hadoop.hbase.master.MergeTransaction.JournalState; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import com.google.common.base.Preconditions; + +/** + * Handles processing region merges. Put in a queue, owned by MergeManager. + */ +class MergeRequest implements Runnable { + static final Log LOG = LogFactory.getLog(MergeRequest.class); + + private final MergeTransactionData transactionData; + private JournalState initialState; + private final MergeManager mergeManager; + private final MasterServices masterServices; + + private static Class mergeTransactionClass; + + MergeRequest(final MergeManager mergeManager, + final MergeTransactionData transactionData, JournalState initialState, + MasterServices masterServices) { + Preconditions.checkNotNull(masterServices); + this.transactionData = transactionData; + this.initialState = initialState; + this.mergeManager = mergeManager; + this.masterServices = masterServices; + } + + @Override + public void run() { + if (this.masterServices.isStopped()) { + LOG.debug("Skipping merge because server is stopped=" + + this.masterServices.isStopped()); + return; + } + MonitoredTask status = TaskMonitor.get().createStatus( + "Merging " + transactionData + " with initialState=" + initialState); + try { + mergeManager.waitUntilMasterInitialized(); + HRegionInfo regionInfoA = this.mergeManager.getRegionInfoByEncodedName( + transactionData.getTableName(), transactionData.getRegionA()); + HRegionInfo regionInfoB = this.mergeManager.getRegionInfoByEncodedName( + transactionData.getTableName(), transactionData.getRegionB()); + if ((regionInfoA == null || regionInfoB == null)) { + if (!masterServices.getAssignmentManager().getZKTable() + .isTablePresent(transactionData.getTableName())) { + String msg = "Skip merging " + transactionData + + ", because table is not present now"; + LOG.info(msg); + status.abort(msg); + skipRequest(regionInfoA, regionInfoB); + return; + } else if (this.initialState == JournalState.CREATE_MERGE) { + String msg = "Skip merging " + transactionData + + ", because couldn't get the regioninfo, initialState=" + + initialState; + LOG.info(msg); + status.abort(msg); + skipRequest(regionInfoA, regionInfoB); + return; + } else if (this.initialState != JournalState.COMPLETE_MERGING) { + String msg = "Resubmit merging, because couldn't get the regioninfo, and initialState=" + + initialState; + LOG.warn(msg); + status.abort(msg); + mergeManager.addToResubmitQueue(transactionData); + return; + } + } else { + if (!transactionData.isForce() && !isAdjacent(regionInfoA, regionInfoB)) { + String msg = "Skip merging " + transactionData + + ", because two region is not adjacent, regionA=" + + regionInfoA.getRegionNameAsString() + ",regionB=" + + regionInfoB.getRegionNameAsString(); + LOG.info(msg); + status.abort(msg); + skipRequest(regionInfoA, regionInfoB); + return; + } + } + status.setStatus("Staring merging "); + LOG.info("Start merging " + transactionData + ", initialState=" + + initialState); + MergeTransaction mt = null; + int triesNumber = masterServices.getConfiguration().getInt( + "hbase.master.merge.tries.number", 5); + for (int i = 0; i < triesNumber; i++) { + try { + mt = createMergeTransactionInstance( + masterServices.getConfiguration(), masterServices, mergeManager, + transactionData, regionInfoA, regionInfoB, initialState, status); + mt.execute(this.masterServices); + break; + } catch (IOException e) { + if (i == triesNumber - 1) { + throw e; + } + LOG.warn("Failed executing merge, " + transactionData + ", retry=" + + i, e); + if (mt != null) { + this.initialState = mt.getCurrentState(); + } + } + } + HRegionInfo mergedRegion=mt.getMergedRegion(); + String msg = "Complete merging "+ transactionData + + ", successful=" + mt.isSuccessful() + + (mergedRegion == null ? "" : ", merged region=" + + mergedRegion.getRegionNameAsString()); + LOG.info(msg); + status.markComplete(msg); + } catch (InterruptedException e) { + String msg = "Merging" + transactionData + "interrupted by user "; + LOG.warn(msg, e); + status.abort(msg); + } catch (IOException e) { + String msg = "IOException in merging " + transactionData; + LOG.warn(msg, e); + status.abort(msg); + if (masterServices.getMasterFileSystem().checkFileSystem()) { + this.mergeManager.addToResubmitQueue(transactionData); + } + } catch (KeeperException e) { + String msg = "KeeperException in merging " + transactionData; + LOG.error(msg, e); + status.abort(msg); + masterServices.abort(msg, e); + } catch (Throwable e) { + String msg = "Unknown Exception in merging " + transactionData; + LOG.error(msg, e); + status.abort(msg); + this.mergeManager.addToResubmitQueue(transactionData); + } finally { + status.cleanup(); + } + } + + private void skipRequest(HRegionInfo regionInfoA, HRegionInfo regionInfoB) + throws InterruptedException, KeeperException { + if(regionInfoA!=null) this.mergeManager.assignMergingRegionIfOffline(regionInfoA); + if(regionInfoB!=null) this.mergeManager.assignMergingRegionIfOffline(regionInfoB); + this.mergeManager.finishMergeTransaction(transactionData, true); + } + + /** + * Check whether two regions are adjacent + * @param regionA + * @param regionB + * @return true if two regions are adjacent + */ + public static boolean isAdjacent(HRegionInfo regionA, HRegionInfo regionB) { + HRegionInfo a = regionA; + HRegionInfo b = regionB; + if (Bytes.compareTo(a.getStartKey(), b.getStartKey()) > 0) { + a = regionB; + b = regionA; + } + if (Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0) { + return true; + } + return false; + } + + /** + * Utility for constructing an instance of the passed MergeTransaction class. + * @param conf + * @param masterServices + * @param mergeManager + * @param transactionData + * @param mergeA + * @param mergeB + * @param initialState + * @return instance of MergeTransaction class + */ + static MergeTransaction createMergeTransactionInstance(final Configuration conf, + final MasterServices masterServices, final MergeManager mergeManager, + final MergeTransactionData transactionData, final HRegionInfo mergeA, + final HRegionInfo mergeB, final JournalState initialState, + final MonitoredTask status) { + if (mergeTransactionClass == null) { + mergeTransactionClass = conf.getClass( + "hbase.master.merge.transaction.impl", MergeTransaction.class, + MergeTransaction.class); + } + try { + Constructor c = mergeTransactionClass + .getConstructor(MasterServices.class, MergeManager.class, + MergeTransactionData.class, HRegionInfo.class, HRegionInfo.class, + JournalState.class, MonitoredTask.class); + return c.newInstance(masterServices, mergeManager, transactionData, + mergeA, mergeB, initialState, status); + } catch (InvocationTargetException ite) { + Throwable target = ite.getTargetException() != null ? ite + .getTargetException() : ite; + if (target.getCause() != null) + target = target.getCause(); + throw new RuntimeException("Failed construction of MergeTransaction: " + + mergeTransactionClass.toString(), target); + } catch (Exception e) { + throw new RuntimeException("Failed construction of MergeTransaction: " + + mergeTransactionClass.toString() + + ((e.getCause() != null) ? e.getCause().getMessage() : ""), e); + } + } + +}