-ROOT-
diff --git a/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index 9420505..2c52a07 100644
--- a/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ b/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -61,11 +61,11 @@ implements WritableComparable
* **NOTE**
*
@@ -75,8 +75,8 @@ implements WritableComparable We give the enums indices so we can add types later and keep them
* grouped together rather than have to add them always to the end as we
* would have to if we used raw enum ordinals.
@@ -110,6 +110,8 @@ public abstract class EventHandler implements Runnable, Comparable
* This method is asynchronous.
@@ -545,7 +554,7 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
*/
void compactRegion(HRegionInfo regionInfo, boolean major, byte[] columnFamily)
throws NotServingRegionException, IOException;
-
+
/**
* Replicates the given entries. The guarantee is that the given entries
* will be durable on the slave cluster if this method returns without
@@ -615,24 +624,24 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Delete delete)
throws IOException;
-
+
/**
* Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects.
* This method could be fairly heavyweight in that it evaluates the entire HBase file-system
- * against what is in the RegionServer BlockCache.
- *
+ * against what is in the RegionServer BlockCache.
+ *
* @return BlockCacheColumnFamilySummary
* @throws IOException exception
*/
public List
* This is used for failover to recover the lost regions that belonged to
- * RegionServers which failed while there was no active master or regions
+ * RegionServers which failed while there was no active master or regions
* that were in RIT.
*
- *
+ *
* @param deadServers
* The list of dead servers which failed while there was no active
* master. Can be null.
@@ -2798,7 +2829,7 @@ public class AssignmentManager extends ZooKeeperListener {
List
+ * A region server could reject the close request because it either does not
+ * have the specified region.
+ * @param server server to merge regions
+ * @param region_a region to merge
+ * @param region_b region to merge
+ * @param forcible true if do a compulsory merge, otherwise we will only merge
+ * two adjacent regions
+ * @throws IOException
+ */
+ public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
+ HRegionInfo region_b, boolean forcible) throws IOException {
+ if (server == null)
+ throw new NullPointerException("Passed server is null");
+ if (region_a == null || region_b == null)
+ throw new NullPointerException("Passed region is null");
+
+ HRegionInterface hri = getServerConnection(server);
+ if (hri == null) {
+ throw new IOException("Attempting to send MERGE REGIONS RPC to server "
+ + server.toString() + " for region "
+ + region_a.getRegionNameAsString() + ","
+ + region_b.getRegionNameAsString()
+ + " failed because no RPC connection found to this server");
+ }
+ hri.mergeRegions(region_a, region_b, forcible);
+ }
+
+ /**
* @param sn
* @return
* @throws IOException
@@ -695,7 +726,7 @@ public class ServerManager {
}
}
}
-
+
/**
* To clear any dead server with same host name and port of any online server
*/
diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/DispatchMergingRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/DispatchMergingRegionHandler.java
new file mode 100644
index 0000000..e547172
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/master/handler/DispatchMergingRegionHandler.java
@@ -0,0 +1,162 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.CatalogJanitor;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Handles MERGE regions request on master: move the regions together(on the
+ * same regionserver) and send MERGE RPC to regionserver.
+ *
+ * NOTE:The real merge is executed on the regionserver
+ *
+ */
+@InterfaceAudience.Private
+public class DispatchMergingRegionHandler extends EventHandler {
+ private static final Log LOG = LogFactory.getLog(DispatchMergingRegionHandler.class);
+ private final MasterServices masterServices;
+ private final CatalogJanitor catalogJanitor;
+ private HRegionInfo region_a;
+ private HRegionInfo region_b;
+ private final boolean forcible;
+ private final int timeout;
+
+ public DispatchMergingRegionHandler(final MasterServices services,
+ final CatalogJanitor catalogJanitor, final HRegionInfo region_a,
+ final HRegionInfo region_b, final boolean forcible) {
+ super(services, EventType.C_M_MERGE_REGION);
+ this.masterServices = services;
+ this.catalogJanitor = catalogJanitor;
+ this.region_a = region_a;
+ this.region_b = region_b;
+ this.forcible = forcible;
+ this.timeout = server.getConfiguration().getInt(
+ "hbase.master.regionmerge.timeout", 30 * 1000);
+ }
+
+ @Override
+ public void process() throws IOException {
+ boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(region_a);
+ if (regionAHasMergeQualifier
+ || !catalogJanitor.cleanMergeQualifier(region_b)) {
+ LOG.info("Skip merging regions " + region_a.getRegionNameAsString()
+ + ", " + region_b.getRegionNameAsString() + ", because region "
+ + (regionAHasMergeQualifier ? region_a.getEncodedName() : region_b
+ .getEncodedName()) + " has merge qualifier");
+ return;
+ }
+
+ AssignmentManager am = masterServices.getAssignmentManager();
+ ServerName region_a_location = am.getRegionServerOfRegion(region_a);
+ ServerName region_b_location = am.getRegionServerOfRegion(region_b);
+ if (region_a_location == null || region_b_location == null) {
+ LOG.info("Skip merging regions " + region_a.getRegionNameAsString()
+ + ", " + region_b.getRegionNameAsString() + ", because region "
+ + (region_a_location == null ? region_a.getEncodedName() : region_b
+ .getEncodedName()) + " is not online now");
+ return;
+ }
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ boolean onSameRS = region_a_location.equals(region_b_location);
+
+ // Make sure regions are on the same regionserver before send merge
+ // regions request to regionserver
+ if (!onSameRS) {
+ // Move region_b to region a's location, switch region_a and region_b if
+ // region_a's load lower than region_b's, so we will always move lower
+ // load region
+ RegionLoad loadOfRegionA = masterServices.getServerManager()
+ .getLoad(region_a_location).getRegionsLoad()
+ .get(region_a.getRegionName());
+ RegionLoad loadOfRegionB = masterServices.getServerManager()
+ .getLoad(region_b_location).getRegionsLoad()
+ .get(region_b.getRegionName());
+ if (loadOfRegionA != null && loadOfRegionB != null
+ && loadOfRegionA.getRequestsCount() < loadOfRegionB
+ .getRequestsCount()) {
+ // switch region_a and region_b
+ HRegionInfo tmpRegion = this.region_a;
+ this.region_a = this.region_b;
+ this.region_b = tmpRegion;
+ ServerName tmpLocation = region_a_location;
+ region_a_location = region_b_location;
+ region_b_location = tmpLocation;
+ }
+
+ RegionPlan regionPlan = new RegionPlan(region_b, region_b_location,
+ region_a_location);
+ masterServices.getAssignmentManager().balance(regionPlan);
+ while (!masterServices.isStopped()) {
+ try {
+ Thread.sleep(20);
+ region_b_location = masterServices.getAssignmentManager()
+ .getRegionServerOfRegion(region_b);
+ onSameRS = region_a_location.equals(region_b_location);
+ if (onSameRS || am.isRegionInTransition(region_b) == null) {
+ // Regions are on the same RS, or region_b is not in
+ // RegionInTransition any more
+ break;
+ }
+ if ((EnvironmentEdgeManager.currentTimeMillis() - startTime) > timeout) break;
+ } catch (InterruptedException e) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(e);
+ throw iioe;
+ }
+ }
+ }
+
+ if (onSameRS) {
+ try{
+ masterServices.getServerManager().sendRegionsMerge(region_a_location,
+ region_a, region_b, forcible);
+ LOG.info("Successfully send MERGE REGIONS RPC to server "
+ + region_a_location.toString() + " for region "
+ + region_a.getRegionNameAsString() + ","
+ + region_b.getRegionNameAsString() + ", focible=" + forcible);
+ } catch (IOException ie) {
+ LOG.info("Failed send MERGE REGIONS RPC to server "
+ + region_a_location.toString() + " for region "
+ + region_a.getRegionNameAsString() + ","
+ + region_b.getRegionNameAsString() + ", focible=" + forcible + ", "
+ + ie.getMessage());
+ }
+ } else {
+ LOG.info("Cancel merging regions " + region_a.getRegionNameAsString()
+ + ", " + region_b.getRegionNameAsString()
+ + ", because can't move them together after "
+ + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/MergedRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/MergedRegionHandler.java
new file mode 100644
index 0000000..e8898ad
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/master/handler/MergedRegionHandler.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+/**
+ * Handles MERGE regions event on Master, master receive the merge report from
+ * the regionserver, then offline the merging regions and online the merged
+ * region.Here region_a sorts before region_b.
+ */
+@InterfaceAudience.Private
+public class MergedRegionHandler extends EventHandler implements
+ TotesHRegionInfo {
+ private static final Log LOG = LogFactory.getLog(MergedRegionHandler.class);
+ private final AssignmentManager assignmentManager;
+ private final HRegionInfo merged;
+ private final HRegionInfo region_a;
+ private final HRegionInfo region_b;
+ private final ServerName sn;
+
+ public MergedRegionHandler(Server server,
+ AssignmentManager assignmentManager, ServerName sn,
+ final List
+ * Here is an example of how you would use this class:
+ *
+ *
+ * This class is not thread safe. Caller needs ensure merge is run by one thread
+ * only.
+ */
+@InterfaceAudience.Private
+public class RegionMergeTransaction {
+ private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
+
+ // Merged region info
+ private HRegionInfo mergedRegionInfo;
+ // region_a sorts before region_b
+ private final HRegion region_a;
+ private final HRegion region_b;
+ // merges dir is under region_a
+ private final Path mergesdir;
+ private int znodeVersion = -1;
+ // We only merge adjacent regions if forcible is false
+ private final boolean forcible;
+
+ /**
+ * Types to add to the transaction journal. Each enum is a step in the merge
+ * transaction. Used to figure how much we need to rollback.
+ */
+ enum JournalEntry {
+ /**
+ * Set region as in transition, set it into MERGING state.
+ */
+ SET_MERGING_IN_ZK,
+ /**
+ * We created the temporary merge data directory.
+ */
+ CREATED_MERGE_DIR,
+ /**
+ * Closed the merging region A.
+ */
+ CLOSED_REGION_A,
+ /**
+ * The merging region A has been taken out of the server's online regions list.
+ */
+ OFFLINED_REGION_A,
+ /**
+ * Closed the merging region B.
+ */
+ CLOSED_REGION_B,
+ /**
+ * The merging region B has been taken out of the server's online regions list.
+ */
+ OFFLINED_REGION_B,
+ /**
+ * Started in on creation of the merged region.
+ */
+ STARTED_MERGED_REGION_CREATION,
+ /**
+ * Point of no return. If we got here, then transaction is not recoverable
+ * other than by crashing out the regionserver.
+ */
+ PONR
+ }
+
+ /*
+ * Journal of how far the merge transaction has progressed.
+ */
+ private final List
+ * Does not transition nodes from other states. If a node already exists for
+ * this region, a {@link NodeExistsException} will be thrown.
+ *
+ * @param zkw zk reference
+ * @param region region to be created as offline
+ * @param serverName server event originates from
+ * @return Version of znode created.
+ * @throws KeeperException
+ * @throws IOException
+ */
+ int createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
+ final ServerName serverName) throws KeeperException, IOException {
+ LOG.debug(zkw.prefix("Creating ephemeral node for "
+ + region.getEncodedName() + " in MERGING state"));
+ RegionTransitionData rt = new RegionTransitionData(
+ EventType.RS_ZK_REGION_MERGING, region.getRegionName(), serverName);
+ String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
+ if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.getBytes())) {
+ throw new IOException("Failed create of ephemeral " + node);
+ }
+ // Transition node from MERGING to MERGING and pick up version so we
+ // can be sure this znode is ours; version is needed deleting.
+ return transitionNodeMerging(zkw, region, serverName, -1);
+ }
+
+ /**
+ * Transitions an existing node for the specified region which is currently in
+ * the MERGING state to be in the MERGE state. Converts the ephemeral MERGING
+ * znode to an ephemeral MERGE node. Master cleans up MERGE znode when it
+ * reads it (or if we crash, zk will clean it up).
+ *
+ *
+ * Does not transition nodes from other states. If for some reason the node
+ * could not be transitioned, the method returns -1. If the transition is
+ * successful, the version of the node after transition is returned.
+ *
+ *
+ * This method can fail and return false for three different reasons:
+ *
+ * Does not set any watches.
+ *
+ *
+ * This method should only be used by a RegionServer when completing the open
+ * of merged region.
+ *
+ * @param zkw zk reference
+ * @param merged region to be transitioned to opened
+ * @param a merging region A
+ * @param b merging region B
+ * @param serverName server event originates from
+ * @param znodeVersion expected version of data before modification
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException if unexpected zookeeper exception
+ * @throws IOException
+ */
+ private static int transitionNodeMerge(ZooKeeperWatcher zkw,
+ HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
+ final int znodeVersion) throws KeeperException, IOException {
+ byte[] payload = Writables.getBytes(merged, a, b);
+ return ZKAssign.transitionNode(zkw, merged, serverName,
+ EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGE,
+ znodeVersion, payload);
+ }
+
+ /**
+ *
+ * @param zkw zk reference
+ * @param parent region to be transitioned to merging
+ * @param serverName server event originates from
+ * @param version znode version
+ * @return version of node after transition, -1 if unsuccessful transition
+ * @throws KeeperException
+ * @throws IOException
+ */
+ int transitionNodeMerging(final ZooKeeperWatcher zkw,
+ final HRegionInfo parent, final ServerName serverName, final int version)
+ throws KeeperException, IOException {
+ return ZKAssign.transitionNode(zkw, parent, serverName,
+ EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGING,
+ version);
+ }
+
+ private static int tickleNodeMerge(ZooKeeperWatcher zkw, HRegionInfo merged,
+ HRegionInfo a, HRegionInfo b, ServerName serverName,
+ final int znodeVersion) throws KeeperException, IOException {
+ byte[] payload = Writables.getBytes(merged, a, b);
+ return ZKAssign.transitionNode(zkw, merged, serverName,
+ EventType.RS_ZK_REGION_MERGE, EventType.RS_ZK_REGION_MERGE,
+ znodeVersion, payload);
+ }
+
+ /**
+ * Checks if the given region has merge qualifier in .META.
+ * @param services
+ * @param regionName name of specified region
+ * @return true if the given region has merge qualifier in META.(It will be
+ * cleaned by CatalogJanitor)
+ * @throws IOException
+ */
+ boolean hasMergeQualifierInMeta(final RegionServerServices services,
+ final byte[] regionName) throws IOException {
+ // Get merge regions if it is a merged region and already has merge
+ // qualifier
+ PairregionInfo as a RIT.
* @throws KeeperException
* @throws IOException
@@ -520,7 +521,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (data == null) return false;
HRegionInfo hri = regionInfo;
if (hri == null) {
- if ((hri = getHRegionInfo(data)) == null) return false;
+ if ((hri = getHRegionInfo(data)) == null) return false;
}
processRegionsInTransition(data, hri, deadServers, stat.getVersion());
return true;
@@ -641,7 +642,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
-
+
/**
* Put the region hri into an offline state up in zk.
@@ -794,6 +795,34 @@ public class AssignmentManager extends ZooKeeperListener {
regionState.getRegion(), sn, daughters));
break;
+ case RS_ZK_REGION_MERGING:
+ // Merged region is a new region, we can't find it in the region states now.
+ // Do nothing.
+ break;
+
+ case RS_ZK_REGION_MERGE:
+ // Assert that we can get a serverinfo for this server.
+ if (!this.serverManager.isServerOnline(sn)) {
+ LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
+ break;
+ }
+ // Get merged and merging regions.
+ byte[] payloadOfMerge = data.getPayload();
+ List.META. catalog
* table on a period looking for unused regions to garbage collect.
*/
-class CatalogJanitor extends Chore {
+public class CatalogJanitor extends Chore {
private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
private final Server server;
private final MasterServices services;
@@ -96,16 +97,38 @@ class CatalogJanitor extends Chore {
}
/**
- * Scans META and returns a number of scanned rows, and
- * an ordered map of split parents.
+ * Scans META and returns a number of scanned rows, and a map of merged
+ * regions, and an ordered map of split parents.
+ * @return triple of scanned rows, map of merged regions and map of split
+ * parent regioninfos
+ * @throws IOException
*/
- Pair.META. table looking for
* garbage to collect.
+ * @return number of cleaned regions
* @throws IOException
*/
int scan() throws IOException {
- Pairf -- parentdir is family,
+ // then the directory above is the region name.
+ String mergingRegionName = regionInfo.getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(referenceDir, f.getPath().getName() + "."
+ + mergingRegionName);
+ return r.write(fs, p);
+ }
+
+ /**
+ * Commit a merged region, moving it from the merges temporary directory to
+ * the proper location in the filesystem.
+ * @param mergedRegionInfo merged region {@link HRegionInfo}
+ * @throws IOException
+ */
+ void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException {
+ Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
+ Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
+ // Move the tmp dir in the expected location
+ if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
+ if (!fs.rename(mergedRegionTmpDir, regionDir)) {
+ throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
+ + regionDir);
+ }
+ }
+ }
+
+ /**
+ * Create a merged region given a temp directory with the region data.
+ * @param mergedRegionInfo
+ * @param region_b another merging region
+ * @return merged hregion
+ * @throws IOException
+ */
+ HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
+ final HRegion region_b) throws IOException {
+ HRegion r = HRegion.newHRegion(this.getTableDir(), this.getLog(),
+ getFilesystem(), this.getConf(), mergedRegionInfo,
+ this.getTableDesc(), this.rsServices);
+ r.readRequestsCount.set(this.getReadRequestsCount()
+ + region_b.getReadRequestsCount());
+ r.writeRequestsCount.set(this.getWriteRequestsCount()
+ + region_b.getWriteRequestsCount());
+ this.commitMergedRegion(mergedRegionInfo);
+ return r;
+ }
+
/**
* Inserts a new region's meta information into the passed
* meta region. Used by the HMaster bootstrap code adding
@@ -4567,11 +4698,11 @@ public class HRegion implements HeapSize { // , Writable{
long totalReadRequestCount = a.readRequestsCount.get() + b.readRequestsCount.get();
dstRegion.readRequestsCount.set(totalReadRequestCount);
dstRegion.opMetrics.setReadRequestCountMetrics(totalReadRequestCount);
-
+
long totalWriteRequestCount = a.writeRequestsCount.get() + b.writeRequestsCount.get();
dstRegion.writeRequestsCount.set(totalWriteRequestCount);
dstRegion.opMetrics.setWriteRequestCountMetrics(totalWriteRequestCount);
-
+
dstRegion.initialize();
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
@@ -5645,7 +5776,7 @@ public class HRegion implements HeapSize { // , Writable{
{
this.opMetrics.setReadRequestCountMetrics(value);
}
-
+
/*
* Set the write request count defined in opMetrics
* @param value absolute value of write request count
@@ -5654,7 +5785,7 @@ public class HRegion implements HeapSize { // , Writable{
{
this.opMetrics.setWriteRequestCountMetrics(value);
}
-
+
/** @param coprocessorHost the new coprocessor host */
public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
this.coprocessorHost = coprocessorHost;
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e9b9ef3..9c9d04b 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3201,6 +3201,29 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
compactSplitThread.requestSplit(region, region.checkSplit());
}
+ /**
+ * Merge two regions. Asynchronous operation.
+ * @param encodedNameOfRegionA encoded name of region a
+ * @param encodedNameOfRegionB encoded name of region b
+ * @param forcible true if do a compulsory merge, otherwise we will only merge
+ * two adjacent regions
+ * @throws IOException
+ */
+ @Override
+ @QosPriority(priority = HConstants.HIGH_QOS)
+ public void mergeRegions(final HRegionInfo regionInfoA,
+ final HRegionInfo regionInfoB, final boolean forcible) throws IOException {
+ checkOpen();
+ requestCount.incrementAndGet();
+ HRegion regionA = getRegion(regionInfoA.getRegionName());
+ HRegion regionB = getRegion(regionInfoB.getRegionName());
+ LOG.info("Receiving merging request for " + regionA + ", " + regionB
+ + ",forcible=" + forcible);
+ regionA.flushcache();
+ regionB.flushcache();
+ compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
+ }
+
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public void compactRegion(HRegionInfo regionInfo, boolean major)
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
new file mode 100644
index 0000000..35b6942
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java
@@ -0,0 +1,112 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region merges. Put in a queue, owned by HRegionServer.
+ */
+@InterfaceAudience.Private
+class RegionMergeRequest implements Runnable {
+ static final Log LOG = LogFactory.getLog(RegionMergeRequest.class);
+ private final HRegion region_a;
+ private final HRegion region_b;
+ private final HRegionServer server;
+ private final boolean forcible;
+
+ RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible) {
+ Preconditions.checkNotNull(hrs);
+ this.region_a = a;
+ this.region_b = b;
+ this.server = hrs;
+ this.forcible = forcible;
+ }
+
+ @Override
+ public String toString() {
+ return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible="
+ + forcible;
+ }
+
+ @Override
+ public void run() {
+ if (this.server.isStopping() || this.server.isStopped()) {
+ LOG.debug("Skipping merge because server is stopping="
+ + this.server.isStopping() + " or stopped=" + this.server.isStopped());
+ return;
+ }
+ try {
+ final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ RegionMergeTransaction mt = new RegionMergeTransaction(region_a,
+ region_b, forcible);
+ // If prepare does not return true, for some reason -- logged inside in
+ // the prepare call -- we are not ready to merge just now. Just return.
+ if (!mt.prepare(this.server)) return;
+ try {
+ mt.execute(this.server, this.server);
+ } catch (Exception e) {
+ if (this.server.isStopping() || this.server.isStopped()) {
+ LOG.info(
+ "Skip rollback/cleanup of failed merge of " + region_a + " and "
+ + region_b + " because server is"
+ + (this.server.isStopping() ? " stopping" : " stopped"), e);
+ return;
+ }
+ try {
+ LOG.warn("Running rollback/cleanup of failed merge of "
+ + region_a +" and "+ region_b + "; " + e.getMessage(), e);
+ if (mt.rollback(this.server, this.server)) {
+ LOG.info("Successful rollback of failed merge of "
+ + region_a +" and "+ region_b);
+ } else {
+ this.server.abort("Abort; we got an error after point-of-no-return"
+ + "when merging " + region_a + " and " + region_b);
+ }
+ } catch (RuntimeException ee) {
+ String msg = "Failed rollback of failed merge of "
+ + region_a +" and "+ region_b + " -- aborting server";
+ // If failed rollback, kill this server to avoid having a hole in
+ // table.
+ LOG.info(msg, ee);
+ this.server.abort(msg);
+ }
+ return;
+ }
+ LOG.info("Regions merged, META updated, and report to master. region_a="
+ + region_a + ", region_b=" + region_b + ",merged region="
+ + mt.getMergedRegionInfo().getRegionNameAsString()
+ + ". Region merge took "
+ + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), startTime));
+ } catch (IOException ex) {
+ LOG.error("Merge failed " + this,
+ RemoteExceptionHandler.checkIOException(ex));
+ server.checkFileSystem();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
new file mode 100644
index 0000000..79b1965
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
@@ -0,0 +1,791 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+/**
+ * Executes region merge as a "transaction". It is similar with
+ * SplitTransaction. Call {@link #prepare(RegionServerServices)} to setup the
+ * transaction, {@link #execute(Server, RegionServerServices)} to run the
+ * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if
+ * execute fails.
+ *
+ *
+ * RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
+ * if (!mt.prepare(services)) return;
+ * try {
+ * mt.execute(server, services);
+ * } catch (IOException ioe) {
+ * try {
+ * mt.rollback(server, services);
+ * return;
+ * } catch (RuntimeException e) {
+ * myAbortable.abort("Failed merge, abort");
+ * }
+ * }
+ *
+ * true if the regions are mergeable else
+ * false if they are not (e.g. its already closed, etc.).
+ */
+ public boolean prepare(final RegionServerServices services) {
+ if (!region_a.getTableDesc().getNameAsString()
+ .equals(region_b.getTableDesc().getNameAsString())) {
+ LOG.info("Can't merge regions " + region_a + "," + region_b
+ + " because they do not belong to the same table");
+ return false;
+ }
+ if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
+ LOG.info("Can't merge the same region " + region_a);
+ return false;
+ }
+ if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
+ region_b.getRegionInfo())) {
+ String msg = "Skip merging " + this.region_a.getRegionNameAsString()
+ + " and " + this.region_b.getRegionNameAsString()
+ + ", because they are not adjacent.";
+ LOG.info(msg);
+ return false;
+ }
+ if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
+ return false;
+ }
+ try {
+ boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
+ region_a.getRegionName());
+ if (regionAHasMergeQualifier ||
+ hasMergeQualifierInMeta(services, region_b.getRegionName())) {
+ LOG.debug("Region " + (regionAHasMergeQualifier ? region_a.getRegionNameAsString()
+ : region_b.getRegionNameAsString())
+ + " is not mergeable because it has merge qualifier in META");
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed judging whether merge transaction is available for "
+ + region_a.getRegionNameAsString() + " and "
+ + region_b.getRegionNameAsString(), e);
+ return false;
+ }
+
+ // WARN: make sure there is no parent region of the two merging regions in
+ // .META. If exists, fixing up daughters would cause daughter regions(we
+ // have merged one) online again when we restart master, so we should clear
+ // the parent region to prevent the above case
+ // Since HBASE-7721, we don't need fix up daughters any more. so here do
+ // nothing
+
+ this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
+ region_b.getRegionInfo());
+ return true;
+ }
+
+ /**
+ * Run the transaction.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link #rollback(Server, RegionServerServices)}
+ * @return merged region
+ * @throws IOException
+ * @see #rollback(Server, RegionServerServices)
+ */
+ public HRegion execute(final Server server,
+ final RegionServerServices services) throws IOException {
+ HRegion mergedRegion = createMergedRegion(server, services);
+ openMergedRegion(server, services, mergedRegion);
+ transitionZKNode(server, services);
+ return mergedRegion;
+ }
+
+ /**
+ * Prepare the merged region and region files.
+ * @param server Hosting server instance. Can be null when testing (won't try
+ * and update in zk if a null server)
+ * @param services Used to online/offline regions.
+ * @return merged region
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link #rollback(Server, RegionServerServices)}
+ */
+ HRegion createMergedRegion(final Server server,
+ final RegionServerServices services) throws IOException {
+ LOG.info("Starting merge of " + region_a + " and "
+ + region_b.getRegionNameAsString() + ", forcible=" + forcible);
+ if ((server != null && server.isStopped())
+ || (services != null && services.isStopping())) {
+ throw new IOException("Server is stopped or stopping");
+ }
+
+ // If true, no cluster to write meta edits to or to update znodes in.
+ boolean testing = server == null ? true : server.getConfiguration()
+ .getBoolean("hbase.testing.nocluster", false);
+
+ // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
+ // have zookeeper so don't do zk stuff if server or zookeeper is null
+ if (server != null && server.getZooKeeper() != null) {
+ try {
+ createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
+ server.getServerName());
+ } catch (KeeperException e) {
+ throw new IOException("Failed creating MERGING znode on "
+ + this.mergedRegionInfo.getRegionNameAsString(), e);
+ }
+ }
+ this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
+ if (server != null && server.getZooKeeper() != null) {
+ try {
+ // Transition node from MERGING to MERGING after creating the merge
+ // node. Master will get the callback for node change only if the
+ // transition is successful.
+ // Note that if the transition fails then the rollback will delete the
+ // created znode as the journal entry SET_MERGING_IN_ZK is added.
+ this.znodeVersion = transitionNodeMerging(server.getZooKeeper(),
+ this.mergedRegionInfo, server.getServerName(), -1);
+ } catch (KeeperException e) {
+ throw new IOException("Failed setting MERGING znode on "
+ + this.mergedRegionInfo.getRegionNameAsString(), e);
+ }
+ }
+
+ this.region_a.createMergesDir();
+ this.journal.add(JournalEntry.CREATED_MERGE_DIR);
+
+ Listf -- parentdir is family,
+ // then the directory above is the region name.
+ String mergingRegionName = regionInfo.getEncodedName();
+ // Write reference with same file id only with the other region name as
+ // suffix and into the new region location (under same family).
+ Path p = new Path(referenceDir, f.getPath().getName() + "."
+ + mergingRegionName);
+ return r.write(fs, p);
+ }
+
+ /**
+ * @param server Hosting server instance (May be null when testing).
+ * @param services Services of regionserver, used to online regions.
+ * @throws IOException If thrown, rollback failed. Take drastic action.
+ * @return True if we successfully rolled back, false if we got to the point
+ * of no return and so now need to abort the server to minimize
+ * damage.
+ */
+ public boolean rollback(final Server server,
+ final RegionServerServices services) throws IOException {
+ assert this.mergedRegionInfo != null;
+ boolean result = true;
+ ListIterator
+ *
+ *
+ *