diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index c01e722..33ccf1e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -188,7 +189,7 @@ public class MetaTableAccessor {
* @return An {@link HTable} for hbase:meta
* @throws IOException
*/
- static HTable getMetaHTable(final HConnection hConnection)
+ public static HTable getMetaHTable(final HConnection hConnection)
throws IOException {
return getHTable(hConnection, TableName.META_TABLE_NAME);
}
@@ -749,8 +750,15 @@ public class MetaTableAccessor {
if (replicaId < 0) {
break;
}
-
- locations.add(getRegionLocation(r, regionInfo, replicaId));
+ byte[] parent = getParentReplicaReferenceQualifier(replicaId);
+ // if we have a p_ column the value of that is the
+ // hri of the (split) parent replica. Let's use that as the location of the
+ // daughter replica until the daughter replica is actually created and assigned
+ HRegionInfo h = getHRegionInfo(r, parent);
+ if (h == null) {
+ h = regionInfo;
+ }
+ locations.add(getRegionLocation(r, h, replicaId));
}
return new RegionLocations(locations);
@@ -1211,6 +1219,9 @@ public class MetaTableAccessor {
HRegionInfo copyOfParent = new HRegionInfo(parent);
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
+ byte[] key = MetaTableAccessor.getMetaKeyForRegion(parent);
+ Result result = meta.get(new Get(key));
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
//Put for parent
Put putParent = makePutFromRegionInfo(copyOfParent);
@@ -1222,6 +1233,20 @@ public class MetaTableAccessor {
addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine.
addLocation(putB, sn, 1, splitB.getReplicaId());
+ // bootstrap the daughter replicas to point to the locations of the old parent replicas
+ // via a column pRepl_
+ for (int i = 1; i < rl.size(); i++) {
+ byte[] parentHri;
+ if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica
+ continue;
+ } else {
+ parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray();
+ }
+ putA.addImmutable(HConstants.CATALOG_FAMILY,
+ MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri);
+ putB.addImmutable(HConstants.CATALOG_FAMILY,
+ MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri);
+ }
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(meta, tableRow, putParent, putA, putB);
@@ -1229,11 +1254,23 @@ public class MetaTableAccessor {
meta.close();
}
}
+ /**
+ * Returns the column qualifier for daughter column for replicaId
+ * @param replicaId the replicaId of the region
+ * @return a byte[] for daughter column qualifier
+ */
+ @VisibleForTesting
+ public static byte[] getParentReplicaReferenceQualifier(int replicaId) {
+ return replicaId == 0
+ ? HConstants.PARENT_REPLICA_REFERENCE
+ : Bytes.toBytes(HConstants.PARENT_REPLICA_REFERENCE_STR + META_REPLICA_ID_DELIMITER
+ + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
+ }
/**
* Performs an atomic multi-Mutate operation against the given table.
*/
- private static void multiMutate(HTable table, byte[] row, Mutation... mutations)
+ public static void multiMutate(HTableInterface table, byte[] row, Mutation... mutations)
throws IOException {
CoprocessorRpcChannel channel = table.coprocessorService(row);
MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
@@ -1295,9 +1332,19 @@ public class MetaTableAccessor {
HRegionInfo regionInfo, ServerName sn, long openSeqNum)
throws IOException {
// region replicas are kept in the primary region's row
- Put put = new Put(getMetaKeyForRegion(regionInfo));
+ byte[] metaRow = getMetaKeyForRegion(regionInfo);
+ Put put = new Put(metaRow);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
- putToMetaTable(hConnection, put);
+ if (regionInfo.getReplicaId() != 0) {
+ // The actual daughter replica location is being created above
+ // Delete the parent reference so replica lookups are not redirected there anymore
+ Delete d = new Delete(metaRow);
+ d.deleteColumn(HConstants.CATALOG_FAMILY,
+ MetaTableAccessor.getParentReplicaReferenceQualifier(regionInfo.getReplicaId()));
+ multiMutate(MetaTableAccessor.getMetaHTable(hConnection), metaRow, put, d);
+ } else {
+ putToMetaTable(hConnection, put);
+ }
LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
" with server=" + sn);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 93209fd..e9c9195 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -430,6 +430,12 @@ public final class HConstants {
public static final byte [] SERVERNAME_QUALIFIER = Bytes.toBytes(SERVERNAME_QUALIFIER_STR);
+ /** The parent replica reference qualifier */
+ public static final String PARENT_REPLICA_REFERENCE_STR = "parent";
+ /** The parent replica reference qualifier */
+ public static final byte [] PARENT_REPLICA_REFERENCE =
+ Bytes.toBytes(PARENT_REPLICA_REFERENCE_STR);
+
/** The lower-half split region column qualifier */
public static final byte [] SPLITA_QUALIFIER = Bytes.toBytes("splitA");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 2888e1e..11b9ec2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -30,8 +30,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Result;
@@ -148,6 +151,8 @@ public class AssignmentManager extends ZooKeeperListener {
final private KeyLocker locker = new KeyLocker();
+ Set replicasToClose = Collections.synchronizedSet(new HashSet());
+
/**
* Map of regions to reopen after the schema of a table is changed. Key -
* encoded region name, value - HRegionInfo
@@ -616,6 +621,12 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("Clean cluster startup. Assigning user regions");
assignAllUserRegions(allRegions);
}
+ // unassign replicas of the split parents and the merged regions
+ // the daughter replicas are opened in assignAllUserRegions if it was
+ // not already opened.
+ for (HRegionInfo h : replicasToClose) {
+ unassign(h);
+ }
return failover;
}
@@ -798,7 +809,11 @@ public class AssignmentManager extends ZooKeeperListener {
case RS_ZK_REGION_FAILED_OPEN:
// Region is closed, insert into RIT and handle it
regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
- invokeAssign(regionInfo);
+ if (!replicasToClose.contains(regionInfo)) {
+ invokeAssign(regionInfo);
+ } else {
+ offlineDisabledRegion(regionInfo);
+ }
break;
case M_ZK_REGION_OFFLINE:
@@ -2207,7 +2222,7 @@ public class AssignmentManager extends ZooKeeperListener {
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
if (this.tableStateManager.isTableState(region.getTable(),
ZooKeeperProtos.Table.State.DISABLED,
- ZooKeeperProtos.Table.State.DISABLING)) {
+ ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
LOG.info("Table " + region.getTable() + " is disabled or disabling;"
+ " skipping assign of " + region.getRegionNameAsString());
offlineDisabledRegion(region);
@@ -2461,7 +2476,7 @@ public class AssignmentManager extends ZooKeeperListener {
lock.unlock();
// Region is expected to be reassigned afterwards
- if (reassign && regionStates.isRegionOffline(region)) {
+ if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
assign(region, true);
}
}
@@ -2768,6 +2783,19 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.debug("null result from meta - ignoring but this is strange.");
continue;
}
+ // keep a track of replicas to close. These were the replicas of the originally
+ // unmerged regions. The master might have closed them before but it mightn't
+ // maybe because it crashed.
+ PairOfSameType p = MetaTableAccessor.getMergeRegions(result);
+ if (p.getFirst() != null && p.getSecond() != null) {
+ int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
+ getTable()).getRegionReplication();
+ for (HRegionInfo merge : p) {
+ for (int i = 1; i < numReplicas; i++) {
+ replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
+ }
+ }
+ }
RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
if (rl == null) continue;
HRegionLocation[] locations = rl.getRegionLocations();
@@ -2777,6 +2805,14 @@ public class AssignmentManager extends ZooKeeperListener {
if (regionInfo == null) continue;
int replicaId = regionInfo.getReplicaId();
State state = RegionStateStore.getRegionState(result, replicaId);
+ // keep a track of replicas to close. These were the replicas of the split parents
+ // from the previous life of the master. The master should have closed them before
+ // but it couldn't maybe because it crashed
+ if (replicaId == 0 && state.equals(State.SPLIT)) {
+ for (HRegionLocation h : locations) {
+ replicasToClose.add(h.getRegionInfo());
+ }
+ }
ServerName lastHost = hrl.getServerName();
ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
@@ -3380,7 +3416,8 @@ public class AssignmentManager extends ZooKeeperListener {
// When there are more than one region server a new RS is selected as the
// destination and the same is updated in the region plan. (HBASE-5546)
if (getTableStateManager().isTableState(hri.getTable(),
- ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
+ replicasToClose.contains(hri)) {
offlineDisabledRegion(hri);
return;
}
@@ -3420,7 +3457,8 @@ public class AssignmentManager extends ZooKeeperListener {
private void onRegionClosed(final HRegionInfo hri) {
if (getTableStateManager().isTableState(hri.getTable(),
- ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
+ replicasToClose.contains(hri)) {
offlineDisabledRegion(hri);
return;
}
@@ -3432,8 +3470,8 @@ public class AssignmentManager extends ZooKeeperListener {
}
private String onRegionSplit(ServerName sn, TransitionCode code,
- HRegionInfo p, HRegionInfo a, HRegionInfo b) {
- RegionState rs_p = regionStates.getRegionState(p);
+ final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
+ final RegionState rs_p = regionStates.getRegionState(p);
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (!(rs_p.isOpenOrSplittingOnServer(sn)
@@ -3459,6 +3497,15 @@ public class AssignmentManager extends ZooKeeperListener {
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
invokeUnAssign(a);
invokeUnAssign(b);
+ } else {
+ Callable splitReplicasCallable = new Callable() {
+ @Override
+ public Object call() {
+ doSplittingOfReplicas(p, a, b);
+ return null;
+ }
+ };
+ threadPoolExecutorService.submit(splitReplicasCallable);
}
} else if (code == TransitionCode.SPLIT_PONR) {
try {
@@ -3481,7 +3528,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
private String onRegionMerge(ServerName sn, TransitionCode code,
- HRegionInfo p, HRegionInfo a, HRegionInfo b) {
+ final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
RegionState rs_p = regionStates.getRegionState(p);
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
@@ -3508,6 +3555,15 @@ public class AssignmentManager extends ZooKeeperListener {
if (getTableStateManager().isTableState(p.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
invokeUnAssign(p);
+ } else {
+ Callable mergeReplicasCallable = new Callable() {
+ @Override
+ public Object call() {
+ doMergingOfReplicas(p, a, b);
+ return null;
+ }
+ };
+ threadPoolExecutorService.submit(mergeReplicasCallable);
}
} else if (code == TransitionCode.MERGE_PONR) {
try {
@@ -3614,6 +3670,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
if (et == EventType.RS_ZK_REGION_MERGED) {
+ doMergingOfReplicas(p, hri_a, hri_b);
LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
// Remove region from ZK
try {
@@ -3740,6 +3797,8 @@ public class AssignmentManager extends ZooKeeperListener {
}
if (et == EventType.RS_ZK_REGION_SPLIT) {
+ // split replicas
+ doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
// Remove region from ZK
try {
@@ -3772,6 +3831,110 @@ public class AssignmentManager extends ZooKeeperListener {
return true;
}
+ private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
+ final HRegionInfo hri_b) {
+ // Close replicas for the original unmerged regions. create/assign new replicas
+ // for the merged parent.
+ List unmergedRegions = new ArrayList();
+ unmergedRegions.add(hri_a);
+ unmergedRegions.add(hri_b);
+ Map> map = regionStates.getRegionAssignments(unmergedRegions);
+ Collection> c = map.values();
+ for (List l : c) {
+ for (HRegionInfo h : l) {
+ if (!RegionReplicaUtil.isDefaultReplica(h)) {
+ LOG.debug("Unassigning un-merged replica " + h);
+ unassign(h);
+ }
+ }
+ }
+ int numReplicas = 1;
+ try {
+ numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
+ getRegionReplication();
+ } catch (IOException e) {
+ LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
+ " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
+ "will not be done");
+ }
+ List regions = new ArrayList();
+ for (int i = 1; i < numReplicas; i++) {
+ regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
+ }
+ try {
+ assign(regions);
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
+ ioe.getMessage());
+ } catch (InterruptedException ie) {
+ LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
+ ie.getMessage());
+ }
+ }
+
+ private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
+ final HRegionInfo hri_b) {
+ // create new regions for the replica, and assign them to match with the
+ // current replica assignments. If replica1 of parent is assigned to RS1,
+ // the replica1s of daughters will be on the same machine
+ int numReplicas = 1;
+ try {
+ numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
+ getRegionReplication();
+ } catch (IOException e) {
+ LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
+ " due to " + e.getMessage() + ". The assignment of daughter replicas " +
+ "replicas will not be done");
+ }
+ // unassign the old replicas
+ List parentRegion = new ArrayList();
+ parentRegion.add(parentHri);
+ Map> currentAssign =
+ regionStates.getRegionAssignments(parentRegion);
+ Collection> c = currentAssign.values();
+ for (List l : c) {
+ for (HRegionInfo h : l) {
+ if (!RegionReplicaUtil.isDefaultReplica(h)) {
+ LOG.debug("Unassigning parent's replica " + h);
+ unassign(h);
+ }
+ }
+ }
+ // assign daughter replicas
+ Map map = new HashMap();
+ for (int i = 1; i < numReplicas; i++) {
+ prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
+ prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
+ }
+ try {
+ assign(map);
+ } catch (IOException e) {
+ LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
+ }
+ }
+
+ private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
+ int replicaId, Map map) {
+ HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
+ HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
+ replicaId);
+ LOG.debug("Created replica region for daughter " + daughterReplica);
+ ServerName sn;
+ if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
+ map.put(daughterReplica, sn);
+ } else {
+ List servers = serverManager.getOnlineServersList();
+ sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
+ map.put(daughterReplica, sn);
+ }
+ }
+
+ public Set getReplicasToClose() {
+ return replicasToClose;
+ }
+
/**
* A region is offline. The new state should be the specified one,
* if not null. If the specified state is null, the new state is Offline.
@@ -3786,6 +3949,25 @@ public class AssignmentManager extends ZooKeeperListener {
// Tell our listeners that a region was closed
sendRegionClosedNotification(regionInfo);
+ // also note that all the replicas of the primary should be closed
+ if (state != null && state.equals(State.SPLIT)) {
+ Collection c = new ArrayList(1);
+ c.add(regionInfo);
+ Map> map = regionStates.getRegionAssignments(c);
+ Collection> allReplicas = map.values();
+ for (List list : allReplicas) {
+ replicasToClose.addAll(list);
+ }
+ }
+ else if (state != null && state.equals(State.MERGED)) {
+ Collection c = new ArrayList(1);
+ c.add(regionInfo);
+ Map> map = regionStates.getRegionAssignments(c);
+ Collection> allReplicas = map.values();
+ for (List list : allReplicas) {
+ replicasToClose.addAll(list);
+ }
+ }
}
private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index 051002e..3007d5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -23,7 +23,11 @@ import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,6 +47,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.util.Bytes;
@@ -109,12 +114,13 @@ public class CatalogJanitor extends Chore {
/**
* Scans hbase:meta and returns a number of scanned rows, and a map of merged
* regions, and an ordered map of split parents.
- * @return triple of scanned rows, map of merged regions and map of split
- * parent regioninfos
+ * @return triple of scanned rows, map of merged regions and a pair consisting of
+ * map of split parent regioninfos and the set of parents to which daughters still have
+ * back references to [TODO: the signature is ugly; fix it]
* @throws IOException
*/
- Triple, Map> getMergedRegionsAndSplitParents()
- throws IOException {
+ Triple, Pair, Set>>
+ getMergedRegionsAndSplitParents() throws IOException {
return getMergedRegionsAndSplitParents(null);
}
@@ -124,11 +130,13 @@ public class CatalogJanitor extends Chore {
* null, return merged regions and split parents of all tables, else only the
* specified table
* @param tableName null represents all tables
- * @return triple of scanned rows, and map of merged regions, and map of split
- * parent regioninfos
+ * @return triple of scanned rows, and map of merged regions, and a pair consisting of
+ * map of split parent regioninfos and the set of parents to which daughters still have
+ * back references to [TODO: the signature is ugly; fix it]
* @throws IOException
*/
- Triple, Map> getMergedRegionsAndSplitParents(
+ Triple, Pair, Set>>
+ getMergedRegionsAndSplitParents(
final TableName tableName) throws IOException {
final boolean isTableSpecified = (tableName != null);
// TODO: Only works with single hbase:meta region currently. Fix.
@@ -138,6 +146,7 @@ public class CatalogJanitor extends Chore {
final Map splitParents =
new TreeMap(new SplitParentFirstComparator());
final Map mergedRegions = new TreeMap();
+ final Set parentsReferenced = new HashSet();
// This visitor collects split parents and counts rows in the hbase:meta table
MetaScannerVisitor visitor = new MetaScanner.MetaScannerVisitorBase() {
@@ -152,6 +161,21 @@ public class CatalogJanitor extends Chore {
// Another table, stop scanning
return false;
}
+ int replicaId = 0;
+ NavigableMap> familyMap = r.getNoVersionMap();
+ NavigableMap infoMap = familyMap.get(HConstants.CATALOG_FAMILY);
+ byte[] parentsReferencedColumn =
+ MetaTableAccessor.getParentReplicaReferenceQualifier(replicaId);
+ SortedMap serverMap = infoMap.tailMap(parentsReferencedColumn, false);
+ if (!serverMap.isEmpty()) {
+ for (Entry entry : serverMap.entrySet()) {
+ HRegionInfo h = HRegionInfo.parseFromOrNull(entry.getValue());
+ // the parent row should not be deleted
+ if (h != null) {
+ parentsReferenced.add(RegionReplicaUtil.getRegionInfoForDefaultReplica(h));
+ }
+ }
+ }
if (info.isSplitParent()) splitParents.put(info, r);
if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
mergedRegions.put(info, r);
@@ -165,8 +189,9 @@ public class CatalogJanitor extends Chore {
// the start row
MetaScanner.metaScan(server.getConfiguration(), null, visitor, tableName);
- return new Triple, Map>(
- count.get(), mergedRegions, splitParents);
+ return new Triple, Pair,
+ Set>>(count.get(), mergedRegions, new Pair,
+ Set>(splitParents, parentsReferenced));
}
/**
@@ -216,8 +241,8 @@ public class CatalogJanitor extends Chore {
if (!alreadyRunning.compareAndSet(false, true)) {
return 0;
}
- Triple, Map> scanTriple =
- getMergedRegionsAndSplitParents();
+ Triple, Pair, Set>>
+ scanTriple = getMergedRegionsAndSplitParents();
int count = scanTriple.getFirst();
/**
* clean merge regions first
@@ -244,13 +269,17 @@ public class CatalogJanitor extends Chore {
/**
* clean split parents
*/
- Map splitParents = scanTriple.getThird();
+ Map splitParents = scanTriple.getThird().getFirst();
+ // Get the parents that are referenced from the daughter replicas, and don't
+ // delete the corresponding rows
+ Set parentsReferenced = scanTriple.getThird().getSecond();
// Now work on our list of found parents. See if any we can clean up.
int splitCleaned = 0;
// regions whose parents are still around
HashSet parentNotCleaned = new HashSet();
for (Map.Entry e : splitParents.entrySet()) {
+ if (parentsReferenced != null && parentsReferenced.contains(e.getKey())) continue;
if (!parentNotCleaned.contains(e.getKey().getEncodedName()) &&
cleanParent(e.getKey(), e.getValue())) {
splitCleaned++;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 8e1e040..598ded2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,8 +34,10 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.master.RegionState.State;
@@ -185,7 +189,8 @@ public class RegionStateStore {
try {
int replicaId = hri.getReplicaId();
- Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+ byte[] metaRow = MetaTableAccessor.getMetaKeyForRegion(hri);
+ Put put = new Put(metaRow);
StringBuilder info = new StringBuilder("Updating row ");
info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
if (serverName != null && !serverName.equals(oldServer)) {
@@ -202,6 +207,21 @@ public class RegionStateStore {
}
put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
Bytes.toBytes(state.name()));
+ Collection mutations = null;
+ Collection rows = null;
+ Delete d = null;
+ if (replicaId != 0 && newState.isOpened() && !oldState.isOpened()) {
+ //potentially delete the referenced parent replica column
+ rows = new ArrayList(1);
+ mutations = new ArrayList(2);
+ d = new Delete(metaRow);
+ d.deleteColumn(HConstants.CATALOG_FAMILY,
+ MetaTableAccessor.getParentReplicaReferenceQualifier(replicaId));
+ mutations.add(put);
+ mutations.add(d);
+ rows.add(metaRow);
+ mutations.add(d);
+ }
LOG.info(info);
// Persist the state change to meta
@@ -209,7 +229,8 @@ public class RegionStateStore {
try {
// Assume meta is pinned to master.
// At least, that's what we want.
- metaRegion.put(put);
+ if (rows != null) metaRegion.mutateRowsWithLocks(mutations, rows);
+ else metaRegion.put(put);
return; // Done here
} catch (Throwable t) {
// In unit tests, meta could be moved away by intention
@@ -227,7 +248,8 @@ public class RegionStateStore {
}
}
synchronized(metaTable) {
- metaTable.put(put);
+ if (rows != null) MetaTableAccessor.multiMutate(metaTable, metaRow, put, d);
+ else metaTable.put(put);
}
} catch (IOException ioe) {
LOG.error("Failed to persist region state " + newState, ioe);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
index 3bb5220..b01434e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
@@ -93,7 +93,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
// Check if this table is being disabled or not
if (this.assignmentManager.getTableStateManager().isTableState(this.regionInfo.getTable(),
- ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
+ assignmentManager.getReplicasToClose().contains(regionInfo)) {
assignmentManager.offlineDisabledRegion(regionInfo);
return;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index 607d042..f6d798a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -266,7 +266,8 @@ public class ServerShutdownHandler extends EventHandler {
} else if (rit != null) {
if (rit.isPendingCloseOrClosing()
&& am.getTableStateManager().isTableState(hri.getTable(),
- ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
+ am.getReplicasToClose().contains(hri)) {
// If the table was partially disabled and the RS went down, we should clear the RIT
// and remove the node for the region.
// The rit that we use may be stale in case the table was in DISABLING state
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index c5b29e6..46e11e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -37,12 +37,16 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
@@ -489,31 +493,43 @@ public class SplitTransaction {
HRegionInfo copyOfParent = new HRegionInfo(parent);
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
+ HTable meta = MetaTableAccessor.getMetaHTable(hConnection);
//Put for parent
Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
mutations.add(putParent);
+ byte[] key = MetaTableAccessor.getMetaKeyForRegion(parent);
+ Result result = meta.get(new Get(key));
+ RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
//Puts for daughters
Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
- addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
- addLocation(putB, serverName, 1);
+ //these are new regions, openSeqNum = 1 is fine.
+ addLocation(putA, serverName, 1, 0);
+ addLocation(putB, serverName, 1, 0);
+ // set the replicas to point to the parent replicas
+ for (int i = 1; i < rl.size(); i++) {
+ byte[] parentHri;
+ if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica
+ continue;
+ } else {
+ parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray();
+ }
+ putA.addImmutable(HConstants.CATALOG_FAMILY,
+ MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri);
+ putB.addImmutable(HConstants.CATALOG_FAMILY,
+ MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri);
+ }
mutations.add(putA);
mutations.add(putB);
MetaTableAccessor.mutateMetaTable(hConnection, mutations);
}
- public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
- p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
- Bytes.toBytes(sn.getHostAndPort()));
- p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
- Bytes.toBytes(sn.getStartcode()));
- p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
- Bytes.toBytes(openSeqNum));
- return p;
+ public Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId) {
+ return MetaTableAccessor.addLocation(p, sn, openSeqNum, replicaId);
}
/*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 7975c51..618b03c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -149,6 +149,23 @@ public class StoreFileInfo implements Comparable {
}
/**
+ * Create a Store File Info from an HFileLink
+ * @param conf
+ * @param fs
+ * @param fileStatus
+ * @param link
+ * @throws IOException
+ */
+ public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
+ final Reference reference)
+ throws IOException {
+ this.conf = conf;
+ this.fileStatus = fileStatus;
+ this.reference = reference;
+ this.link = null;
+ }
+
+ /**
* Sets the region coprocessor env.
* @param coprocessorHost
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 237e316..74a74a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -83,6 +84,11 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
return new StoreFileInfo(conf, fs, status);
}
+ if (StoreFileInfo.isReference(status.getPath())) {
+ Reference reference = Reference.read(fs, status.getPath());
+ return new StoreFileInfo(conf, fs, status, reference);
+ }
+
// else create a store file link. The link file does not exists on filesystem though.
HFileLink link = new HFileLink(conf,
HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName()
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 1f1f3d4..bc3a1f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -92,7 +92,7 @@ public class TestReplicasClient {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
- static final AtomicReference cdl =
+ private static final AtomicReference cdl =
new AtomicReference(new CountDownLatch(0));
Random r = new Random();
public SlowMeCopro() {
@@ -129,7 +129,7 @@ public class TestReplicasClient {
private void slowdownCode(final ObserverContext e) {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
- CountDownLatch latch = cdl.get();
+ CountDownLatch latch = getCdl().get();
try {
if (sleepTime.get() > 0) {
LOG.info("Sleeping for " + sleepTime.get() + " ms");
@@ -148,6 +148,10 @@ public class TestReplicasClient {
LOG.info("We're not the primary replicas.");
}
}
+
+ public static AtomicReference getCdl() {
+ return cdl;
+ }
}
@BeforeClass
@@ -291,7 +295,7 @@ public class TestReplicasClient {
public void testUseRegionWithoutReplica() throws Exception {
byte[] b1 = "testUseRegionWithoutReplica".getBytes();
openRegion(hriSecondary);
- SlowMeCopro.cdl.set(new CountDownLatch(0));
+ SlowMeCopro.getCdl().set(new CountDownLatch(0));
try {
Get g = new Get(b1);
Result r = table.get(g);
@@ -347,14 +351,14 @@ public class TestReplicasClient {
byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
openRegion(hriSecondary);
- SlowMeCopro.cdl.set(new CountDownLatch(1));
+ SlowMeCopro.getCdl().set(new CountDownLatch(1));
try {
Get g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
closeRegion(hriSecondary);
}
}
@@ -465,13 +469,13 @@ public class TestReplicasClient {
LOG.info("sleep and is not stale done");
// But if we ask for stale we will get it
- SlowMeCopro.cdl.set(new CountDownLatch(1));
+ SlowMeCopro.getCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
LOG.info("stale done");
@@ -484,14 +488,14 @@ public class TestReplicasClient {
LOG.info("exists not stale done");
// exists works on stale but don't see the put
- SlowMeCopro.cdl.set(new CountDownLatch(1));
+ SlowMeCopro.getCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse("The secondary has stale data", r.getExists());
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
LOG.info("exists stale before flush done");
flushRegion(hriPrimary);
@@ -500,28 +504,28 @@ public class TestReplicasClient {
Thread.sleep(1000 + REFRESH_PERIOD * 2);
// get works and is not stale
- SlowMeCopro.cdl.set(new CountDownLatch(1));
+ SlowMeCopro.getCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse(r.isEmpty());
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
LOG.info("stale done");
// exists works on stale and we see the put after the flush
- SlowMeCopro.cdl.set(new CountDownLatch(1));
+ SlowMeCopro.getCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getExists());
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
LOG.info("exists stale after flush done");
} finally {
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
Delete d = new Delete(b1);
table.delete(d);
@@ -587,7 +591,7 @@ public class TestReplicasClient {
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
- SlowMeCopro.cdl.get().countDown();
+ SlowMeCopro.getCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 288d115..2d19e72 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -26,8 +26,10 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -631,16 +634,20 @@ public class TestCatalogJanitor {
new byte[0]);
Thread.sleep(1001);
+ final Pair, Set> pair =
+ new Pair, Set>();
final Map splitParents =
new TreeMap(new SplitParentFirstComparator());
splitParents.put(parent, createResult(parent, splita, splitb));
splita.setOffline(true); //simulate that splita goes offline when it is split
splitParents.put(splita, createResult(splita, splitaa,splitab));
+ pair.setFirst(splitParents);
+ pair.setSecond(null);
final Map mergedRegions = new TreeMap();
CatalogJanitor janitor = spy(new CatalogJanitor(server, services));
- doReturn(new Triple, Map>(
- 10, mergedRegions, splitParents)).when(janitor)
+ doReturn(new Triple, Pair, Set>>(
+ 10, mergedRegions, pair)).when(janitor)
.getMergedRegionsAndSplitParents();
//create ref from splita to parent
@@ -824,6 +831,62 @@ public class TestCatalogJanitor {
janitor.join();
}
+
+ @Test
+ public void testCatalogJanitorOperationsWithReplicas() throws Exception {
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ setRootDirAndCleanIt(htu, "testScanDoesNotCleanRegionsWithExistingParents");
+ Server server = new MockServer(htu);
+ MasterServices services = new MockMasterServices(server);
+
+ final HTableDescriptor htd = createHTableDescriptor();
+
+ // Create regions: aaa->{lastEndKey}, aaa->bbb, bbb->{lastEndKey}
+
+ // Parent
+ HRegionInfo parent = new HRegionInfo(htd.getTableName(), Bytes.toBytes("aaa"),
+ new byte[0], true);
+ // Sleep a second else the encoded name on these regions comes out
+ // same for all with same start key and made in same second.
+ Thread.sleep(1001);
+
+ // Daughter a
+ HRegionInfo splita = new HRegionInfo(htd.getTableName(), Bytes.toBytes("aaa"),
+ Bytes.toBytes("bbb"), true);
+ Thread.sleep(1001);
+
+ // Daughter b
+ HRegionInfo splitb = new HRegionInfo(htd.getTableName(), Bytes.toBytes("bbb"),
+ new byte[0]);
+ Thread.sleep(1001);
+
+ final Pair, Set> pair =
+ new Pair, Set>();
+ final Map splitParents =
+ new TreeMap(new SplitParentFirstComparator());
+ splitParents.put(parent, createResult(parent, splita, splitb));
+ pair.setFirst(splitParents);
+ Set referencedParents = new HashSet();
+ referencedParents.add(parent);
+ pair.setSecond(referencedParents);
+
+ final Map mergedRegions = new TreeMap();
+ CatalogJanitor janitor = spy(new CatalogJanitor(server, services));
+ doReturn(new Triple, Pair, Set>>(
+ 10, mergedRegions, pair)).when(janitor)
+ .getMergedRegionsAndSplitParents();
+
+ //since parent is referenced, it will not delete the row
+ assertEquals(0, janitor.scan());
+
+ pair.setSecond(null);
+ //now, there are no references to parent, and the janitor can cleanup
+ assertEquals(1, janitor.scan());
+
+ services.stop("test finished");
+ janitor.join();
+ }
+
/**
* @param description description of the files for logging
* @param storeFiles the status of the files to log
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 6981415..0b811ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.math.RandomUtils;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -287,6 +289,45 @@ public class TestRegionMergeTransactionOnCluster {
}
}
+ @Test
+ public void testMergeWithReplicas() throws Exception {
+ final TableName tableName = TableName.valueOf("testMergeWithReplicas");
+ // Create table and load data.
+ createTableAndLoadData(master, tableName, 5, 2);
+ List> initialRegionToServers =
+ MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(),
+ master.getShortCircuitConnection(), tableName);
+ // Merge 1st and 2nd region
+ PairOfSameType mergedRegions = mergeRegionsAndVerifyRegionNum(master, tableName,
+ 0, 2, 5 * 2 - 2);
+ List> currentRegionToServers =
+ MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(),
+ master.getShortCircuitConnection(), tableName);
+ List initialRegions = new ArrayList();
+ for (Pair p : initialRegionToServers) {
+ initialRegions.add(p.getFirst());
+ }
+ List currentRegions = new ArrayList();
+ for (Pair p : currentRegionToServers) {
+ currentRegions.add(p.getFirst());
+ }
+ assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is the first region
+ assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+ mergedRegions.getFirst(), 1))); //this is the replica of the first region
+ assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is the second region
+ assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+ mergedRegions.getSecond(), 1))); //this is the replica of the second region
+ assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the new region
+ assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+ currentRegions.get(0), 1))); //replica of the new region
+ assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+ currentRegions.get(0), 1))); //replica of the new region
+ assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+ mergedRegions.getFirst(), 1))); //replica of the merged region
+ assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
+ mergedRegions.getSecond(), 1))); //replica of the merged region
+ }
+
private PairOfSameType mergeRegionsAndVerifyRegionNum(
HMaster master, TableName tablename,
int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
@@ -335,11 +376,11 @@ public class TestRegionMergeTransactionOnCluster {
private HTable createTableAndLoadData(HMaster master, TableName tablename)
throws Exception {
- return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM);
+ return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
}
private HTable createTableAndLoadData(HMaster master, TableName tablename,
- int numRegions) throws Exception {
+ int numRegions, int replication) throws Exception {
assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
byte[][] splitRows = new byte[numRegions - 1][];
for (int i = 0; i < splitRows.length; i++) {
@@ -347,6 +388,9 @@ public class TestRegionMergeTransactionOnCluster {
}
HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
+ if (replication > 1) {
+ HBaseTestingUtility.setReplicas(admin, tablename, replication);
+ }
loadData(table);
verifyRowCount(table, ROWSIZE);
@@ -356,7 +400,7 @@ public class TestRegionMergeTransactionOnCluster {
while (System.currentTimeMillis() < timeout) {
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
master.getZooKeeper(), master.getShortCircuitConnection(), tablename);
- if (tableRegions.size() == numRegions)
+ if (tableRegions.size() == numRegions * replication)
break;
Thread.sleep(250);
}
@@ -364,7 +408,7 @@ public class TestRegionMergeTransactionOnCluster {
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
master.getZooKeeper(), master.getShortCircuitConnection(), tablename);
LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
- assertEquals(numRegions, tableRegions.size());
+ assertEquals(numRegions * replication, tableRegions.size());
return table;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index b7626e9..2fbfe3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -28,8 +28,10 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
@@ -38,6 +40,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MiniHBaseCluster;
@@ -57,7 +62,10 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
@@ -65,6 +73,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro;
import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
@@ -80,6 +89,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -128,6 +138,8 @@ public class TestSplitTransactionOnCluster {
TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000);
useZKForAssignment = TESTING_UTIL.getConfiguration().getBoolean(
"hbase.assignment.usezk", false);
+ TESTING_UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
+ MetaUpdateObserver.class.getName());
TESTING_UTIL.startMiniCluster(NB_SERVERS);
}
@@ -924,6 +936,107 @@ public class TestSplitTransactionOnCluster {
}
}
+ @Test
+ public void testSplitWithRegionReplicas() throws Exception {
+ ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL);
+ final TableName tableName =
+ TableName.valueOf("foobar");
+ HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar");
+ htd.setRegionReplication(2);
+ htd.addCoprocessor(SlowMeCopro.class.getName());
+ // Create table then get the single region for our new table.
+ HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")},
+ TESTING_UTIL.getConfiguration());
+ int count;
+ do {
+ count = cluster.getRegions(tableName).size();
+ } while (count != 2);
+ List regions = null;
+ try {
+ regions = cluster.getRegions(tableName);
+ int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName());
+ HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
+ insertData(tableName.getName(), admin, t);
+ // Turn off balancer so it doesn't cut in and mess up our placements.
+ admin.setBalancerRunning(false, true);
+ // Turn off the meta scanner so it don't remove parent on us.
+ cluster.getMaster().setCatalogJanitorEnabled(false);
+ boolean tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(),
+ tableName);
+ assertEquals("The specified table should be present.", true, tableExists);
+ final HRegion region = findSplittableRegion(regions);
+ regionServerIndex = cluster.getServerWith(region.getRegionName());
+ regionServer = cluster.getRegionServer(regionServerIndex);
+ assertTrue("not able to find a splittable region", region != null);
+ String node = ZKAssign.getNodeName(regionServer.getZooKeeper(),
+ region.getRegionInfo().getEncodedName());
+ regionServer.getZooKeeper().sync(node);
+ SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2"));
+ MetaUpdateObserver.deleteCellCount = 0;
+ try {
+ st.prepare();
+ MetaUpdateObserver.shouldRunChecks = true;
+ st.execute(regionServer, regionServer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Split execution should have succeeded with no exceptions thrown " + e);
+ }
+ TESTING_UTIL.waitUntilAllRegionsAssigned(tableName);
+ tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(),
+ tableName);
+ assertEquals("The specified table should be present.", true, tableExists);
+ // exists works on stale and we see the put after the flush
+ byte[] b1 = "row1".getBytes();
+ Get g = new Get(b1);
+ g.setConsistency(Consistency.STRONG);
+ // The following GET will make a trip to the meta to get the new location of the 1st daughter
+ // In the process it will also get the location of the replica of the daughter (initially
+ // pointing to the parent's replica)
+ Result r = t.get(g);
+ Assert.assertFalse(r.isStale());
+ LOG.info("exists stale after flush done");
+
+ SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setConsistency(Consistency.TIMELINE);
+ // This will succeed because in the previous GET we get the location of the replica
+ r = t.get(g);
+ Assert.assertTrue(r.isStale());
+ SlowMeCopro.getCdl().get().countDown();
+ // Ensure that we did the right meta updates for the 'daughter' columns
+ // We would have done a couple of updates. We would have added the 'daughter_0001'
+ // column to the daughter rows, back referencing the parent replicas.
+ // We would have deleted the daughter_0001 column subsequently.
+ // The deleteCellCount is 2 because the delete would be done
+ // for all replica location updates in the meta (for the two
+ // replicas of the daughters)
+ LOG.debug("DeleteCellCount " + MetaUpdateObserver.deleteCellCount + " PutCellCount "
+ + MetaUpdateObserver.putCellCount + " ");
+ assert(MetaUpdateObserver.deleteCellCount == 2);
+ assert(MetaUpdateObserver.putCellCount == 2);
+ assert(!Arrays.equals(MetaUpdateObserver.rowsInPut[0],
+ MetaUpdateObserver.rowsInPut[1]));
+ assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[0],
+ MetaUpdateObserver.rowsInDelete[1]));
+ assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[1],
+ MetaUpdateObserver.rowsInDelete[2]));
+ assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[0],
+ MetaUpdateObserver.rowsInDelete[2]));
+ //assert(false);
+ } finally {
+ MetaUpdateObserver.shouldRunChecks = false;
+ SlowMeCopro.getCdl().get().countDown();
+ if (regions != null) {
+ String node = ZKAssign.getNodeName(zkw, regions.get(0).getRegionInfo()
+ .getEncodedName());
+ ZKUtil.deleteNodeFailSilent(zkw, node);
+ }
+ admin.setBalancerRunning(true, false);
+ cluster.getMaster().setCatalogJanitorEnabled(true);
+ t.close();
+ }
+ }
+
private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException,
InterruptedException {
Put p = new Put(Bytes.toBytes("row1"));
@@ -1192,7 +1305,7 @@ public class TestSplitTransactionOnCluster {
private HRegion findSplittableRegion(final List regions) throws InterruptedException {
for (int i = 0; i < 5; ++i) {
for (HRegion r: regions) {
- if (r.isSplittable()) {
+ if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) {
return(r);
}
}
@@ -1388,6 +1501,51 @@ public class TestSplitTransactionOnCluster {
}
}
+ public static class MetaUpdateObserver extends BaseRegionObserver {
+ static boolean shouldRunChecks = false;
+ static int deleteCellCount;
+ static int putCellCount;
+ static byte[][] rowsInPut = new byte[10][];
+ static byte[][] rowsInDelete = new byte[10][];
+ @Override
+ public void preDelete(final ObserverContext e, final Delete delete,
+ final WALEdit edit, final Durability durability) throws IOException {
+ if (e.getEnvironment().getRegion().getTableDesc().getTableName().
+ equals(TableName.META_TABLE_NAME) && shouldRunChecks) {
+ NavigableMap> map = delete.getFamilyCellMap();
+ List cells = map.get(HConstants.CATALOG_FAMILY);
+ if (cells == null) return;
+ for (Cell c : cells) {
+ if (c.getTypeByte() == KeyValue.Type.Delete.getCode() &&
+ Arrays.equals(CellUtil.cloneQualifier(c),
+ MetaTableAccessor.getParentReplicaReferenceQualifier(1))) {
+ rowsInDelete[deleteCellCount++] = CellUtil.cloneRow(c);
+ }
+ }
+ e.bypass();
+ }
+ }
+
+ @Override
+ public void prePut(final ObserverContext e,
+ final Put put, final WALEdit edit, final Durability durability) {
+ if (e.getEnvironment().getRegion().getTableDesc().getTableName().
+ equals(TableName.META_TABLE_NAME) && shouldRunChecks) {
+ NavigableMap> map = put.getFamilyCellMap();
+ List cells = map.get(HConstants.CATALOG_FAMILY);
+ if (cells == null) return;
+ for (Cell c : cells) {
+ if (c.getTypeByte() == KeyValue.Type.Put.getCode()) {
+ if (Arrays.equals(CellUtil.cloneQualifier(c),
+ MetaTableAccessor.getParentReplicaReferenceQualifier(1))) {
+ rowsInPut[putCellCount++] = CellUtil.cloneRow(c);
+ }
+ }
+ }
+ }
+ }
+ }
+
public static class MockedRegionObserver extends BaseRegionObserver {
private SplitTransaction st = null;
private PairOfSameType daughterRegions = null;
@@ -1428,8 +1586,8 @@ public class TestSplitTransactionOnCluster {
daughterRegions.getFirst().getRegionInfo());
Put putB = MetaTableAccessor.makePutFromRegionInfo(
daughterRegions.getSecond().getRegionInfo());
- st.addLocation(putA, rs.getServerName(), 1);
- st.addLocation(putB, rs.getServerName(), 1);
+ st.addLocation(putA, rs.getServerName(), 1, 0);
+ st.addLocation(putB, rs.getServerName(), 1, 0);
metaEntries.add(putA);
metaEntries.add(putB);
}
| |