diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index c01e722..33ccf1e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -188,7 +189,7 @@ public class MetaTableAccessor { * @return An {@link HTable} for hbase:meta * @throws IOException */ - static HTable getMetaHTable(final HConnection hConnection) + public static HTable getMetaHTable(final HConnection hConnection) throws IOException { return getHTable(hConnection, TableName.META_TABLE_NAME); } @@ -749,8 +750,15 @@ public class MetaTableAccessor { if (replicaId < 0) { break; } - - locations.add(getRegionLocation(r, regionInfo, replicaId)); + byte[] parent = getParentReplicaReferenceQualifier(replicaId); + // if we have a p_ column the value of that is the + // hri of the (split) parent replica. Let's use that as the location of the + // daughter replica until the daughter replica is actually created and assigned + HRegionInfo h = getHRegionInfo(r, parent); + if (h == null) { + h = regionInfo; + } + locations.add(getRegionLocation(r, h, replicaId)); } return new RegionLocations(locations); @@ -1211,6 +1219,9 @@ public class MetaTableAccessor { HRegionInfo copyOfParent = new HRegionInfo(parent); copyOfParent.setOffline(true); copyOfParent.setSplit(true); + byte[] key = MetaTableAccessor.getMetaKeyForRegion(parent); + Result result = meta.get(new Get(key)); + RegionLocations rl = MetaTableAccessor.getRegionLocations(result); //Put for parent Put putParent = makePutFromRegionInfo(copyOfParent); @@ -1222,6 +1233,20 @@ public class MetaTableAccessor { addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine. addLocation(putB, sn, 1, splitB.getReplicaId()); + // bootstrap the daughter replicas to point to the locations of the old parent replicas + // via a column pRepl_ + for (int i = 1; i < rl.size(); i++) { + byte[] parentHri; + if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica + continue; + } else { + parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray(); + } + putA.addImmutable(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri); + putB.addImmutable(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri); + } byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); multiMutate(meta, tableRow, putParent, putA, putB); @@ -1229,11 +1254,23 @@ public class MetaTableAccessor { meta.close(); } } + /** + * Returns the column qualifier for daughter column for replicaId + * @param replicaId the replicaId of the region + * @return a byte[] for daughter column qualifier + */ + @VisibleForTesting + public static byte[] getParentReplicaReferenceQualifier(int replicaId) { + return replicaId == 0 + ? HConstants.PARENT_REPLICA_REFERENCE + : Bytes.toBytes(HConstants.PARENT_REPLICA_REFERENCE_STR + META_REPLICA_ID_DELIMITER + + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId)); + } /** * Performs an atomic multi-Mutate operation against the given table. */ - private static void multiMutate(HTable table, byte[] row, Mutation... mutations) + public static void multiMutate(HTableInterface table, byte[] row, Mutation... mutations) throws IOException { CoprocessorRpcChannel channel = table.coprocessorService(row); MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder @@ -1295,9 +1332,19 @@ public class MetaTableAccessor { HRegionInfo regionInfo, ServerName sn, long openSeqNum) throws IOException { // region replicas are kept in the primary region's row - Put put = new Put(getMetaKeyForRegion(regionInfo)); + byte[] metaRow = getMetaKeyForRegion(regionInfo); + Put put = new Put(metaRow); addLocation(put, sn, openSeqNum, regionInfo.getReplicaId()); - putToMetaTable(hConnection, put); + if (regionInfo.getReplicaId() != 0) { + // The actual daughter replica location is being created above + // Delete the parent reference so replica lookups are not redirected there anymore + Delete d = new Delete(metaRow); + d.deleteColumn(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getParentReplicaReferenceQualifier(regionInfo.getReplicaId())); + multiMutate(MetaTableAccessor.getMetaHTable(hConnection), metaRow, put, d); + } else { + putToMetaTable(hConnection, put); + } LOG.info("Updated row " + regionInfo.getRegionNameAsString() + " with server=" + sn); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 93209fd..e9c9195 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -430,6 +430,12 @@ public final class HConstants { public static final byte [] SERVERNAME_QUALIFIER = Bytes.toBytes(SERVERNAME_QUALIFIER_STR); + /** The parent replica reference qualifier */ + public static final String PARENT_REPLICA_REFERENCE_STR = "parent"; + /** The parent replica reference qualifier */ + public static final byte [] PARENT_REPLICA_REFERENCE = + Bytes.toBytes(PARENT_REPLICA_REFERENCE_STR); + /** The lower-half split region column qualifier */ public static final byte [] SPLITA_QUALIFIER = Bytes.toBytes("splitA"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 2888e1e..11b9ec2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -30,8 +30,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; @@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.Result; @@ -148,6 +151,8 @@ public class AssignmentManager extends ZooKeeperListener { final private KeyLocker locker = new KeyLocker(); + Set replicasToClose = Collections.synchronizedSet(new HashSet()); + /** * Map of regions to reopen after the schema of a table is changed. Key - * encoded region name, value - HRegionInfo @@ -616,6 +621,12 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("Clean cluster startup. Assigning user regions"); assignAllUserRegions(allRegions); } + // unassign replicas of the split parents and the merged regions + // the daughter replicas are opened in assignAllUserRegions if it was + // not already opened. + for (HRegionInfo h : replicasToClose) { + unassign(h); + } return failover; } @@ -798,7 +809,11 @@ public class AssignmentManager extends ZooKeeperListener { case RS_ZK_REGION_FAILED_OPEN: // Region is closed, insert into RIT and handle it regionStates.updateRegionState(regionInfo, State.CLOSED, sn); - invokeAssign(regionInfo); + if (!replicasToClose.contains(regionInfo)) { + invokeAssign(regionInfo); + } else { + offlineDisabledRegion(regionInfo); + } break; case M_ZK_REGION_OFFLINE: @@ -2207,7 +2222,7 @@ public class AssignmentManager extends ZooKeeperListener { private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { if (this.tableStateManager.isTableState(region.getTable(), ZooKeeperProtos.Table.State.DISABLED, - ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) { LOG.info("Table " + region.getTable() + " is disabled or disabling;" + " skipping assign of " + region.getRegionNameAsString()); offlineDisabledRegion(region); @@ -2461,7 +2476,7 @@ public class AssignmentManager extends ZooKeeperListener { lock.unlock(); // Region is expected to be reassigned afterwards - if (reassign && regionStates.isRegionOffline(region)) { + if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) { assign(region, true); } } @@ -2768,6 +2783,19 @@ public class AssignmentManager extends ZooKeeperListener { LOG.debug("null result from meta - ignoring but this is strange."); continue; } + // keep a track of replicas to close. These were the replicas of the originally + // unmerged regions. The master might have closed them before but it mightn't + // maybe because it crashed. + PairOfSameType p = MetaTableAccessor.getMergeRegions(result); + if (p.getFirst() != null && p.getSecond() != null) { + int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst(). + getTable()).getRegionReplication(); + for (HRegionInfo merge : p) { + for (int i = 1; i < numReplicas; i++) { + replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i)); + } + } + } RegionLocations rl = MetaTableAccessor.getRegionLocations(result); if (rl == null) continue; HRegionLocation[] locations = rl.getRegionLocations(); @@ -2777,6 +2805,14 @@ public class AssignmentManager extends ZooKeeperListener { if (regionInfo == null) continue; int replicaId = regionInfo.getReplicaId(); State state = RegionStateStore.getRegionState(result, replicaId); + // keep a track of replicas to close. These were the replicas of the split parents + // from the previous life of the master. The master should have closed them before + // but it couldn't maybe because it crashed + if (replicaId == 0 && state.equals(State.SPLIT)) { + for (HRegionLocation h : locations) { + replicasToClose.add(h.getRegionInfo()); + } + } ServerName lastHost = hrl.getServerName(); ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId); regionStates.createRegionState(regionInfo, state, regionLocation, lastHost); @@ -3380,7 +3416,8 @@ public class AssignmentManager extends ZooKeeperListener { // When there are more than one region server a new RS is selected as the // destination and the same is updated in the region plan. (HBASE-5546) if (getTableStateManager().isTableState(hri.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + replicasToClose.contains(hri)) { offlineDisabledRegion(hri); return; } @@ -3420,7 +3457,8 @@ public class AssignmentManager extends ZooKeeperListener { private void onRegionClosed(final HRegionInfo hri) { if (getTableStateManager().isTableState(hri.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + replicasToClose.contains(hri)) { offlineDisabledRegion(hri); return; } @@ -3432,8 +3470,8 @@ public class AssignmentManager extends ZooKeeperListener { } private String onRegionSplit(ServerName sn, TransitionCode code, - HRegionInfo p, HRegionInfo a, HRegionInfo b) { - RegionState rs_p = regionStates.getRegionState(p); + final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { + final RegionState rs_p = regionStates.getRegionState(p); RegionState rs_a = regionStates.getRegionState(a); RegionState rs_b = regionStates.getRegionState(b); if (!(rs_p.isOpenOrSplittingOnServer(sn) @@ -3459,6 +3497,15 @@ public class AssignmentManager extends ZooKeeperListener { ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { invokeUnAssign(a); invokeUnAssign(b); + } else { + Callable splitReplicasCallable = new Callable() { + @Override + public Object call() { + doSplittingOfReplicas(p, a, b); + return null; + } + }; + threadPoolExecutorService.submit(splitReplicasCallable); } } else if (code == TransitionCode.SPLIT_PONR) { try { @@ -3481,7 +3528,7 @@ public class AssignmentManager extends ZooKeeperListener { } private String onRegionMerge(ServerName sn, TransitionCode code, - HRegionInfo p, HRegionInfo a, HRegionInfo b) { + final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { RegionState rs_p = regionStates.getRegionState(p); RegionState rs_a = regionStates.getRegionState(a); RegionState rs_b = regionStates.getRegionState(b); @@ -3508,6 +3555,15 @@ public class AssignmentManager extends ZooKeeperListener { if (getTableStateManager().isTableState(p.getTable(), ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { invokeUnAssign(p); + } else { + Callable mergeReplicasCallable = new Callable() { + @Override + public Object call() { + doMergingOfReplicas(p, a, b); + return null; + } + }; + threadPoolExecutorService.submit(mergeReplicasCallable); } } else if (code == TransitionCode.MERGE_PONR) { try { @@ -3614,6 +3670,7 @@ public class AssignmentManager extends ZooKeeperListener { } if (et == EventType.RS_ZK_REGION_MERGED) { + doMergingOfReplicas(p, hri_a, hri_b); LOG.debug("Handling MERGED event for " + encodedName + "; deleting node"); // Remove region from ZK try { @@ -3740,6 +3797,8 @@ public class AssignmentManager extends ZooKeeperListener { } if (et == EventType.RS_ZK_REGION_SPLIT) { + // split replicas + doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b); LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node"); // Remove region from ZK try { @@ -3772,6 +3831,110 @@ public class AssignmentManager extends ZooKeeperListener { return true; } + private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a, + final HRegionInfo hri_b) { + // Close replicas for the original unmerged regions. create/assign new replicas + // for the merged parent. + List unmergedRegions = new ArrayList(); + unmergedRegions.add(hri_a); + unmergedRegions.add(hri_b); + Map> map = regionStates.getRegionAssignments(unmergedRegions); + Collection> c = map.values(); + for (List l : c) { + for (HRegionInfo h : l) { + if (!RegionReplicaUtil.isDefaultReplica(h)) { + LOG.debug("Unassigning un-merged replica " + h); + unassign(h); + } + } + } + int numReplicas = 1; + try { + numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()). + getRegionReplication(); + } catch (IOException e) { + LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() + + " due to " + e.getMessage() + ". The assignment of replicas for the merged region " + + "will not be done"); + } + List regions = new ArrayList(); + for (int i = 1; i < numReplicas; i++) { + regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i)); + } + try { + assign(regions); + } catch (IOException ioe) { + LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " + + ioe.getMessage()); + } catch (InterruptedException ie) { + LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " + + ie.getMessage()); + } + } + + private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, + final HRegionInfo hri_b) { + // create new regions for the replica, and assign them to match with the + // current replica assignments. If replica1 of parent is assigned to RS1, + // the replica1s of daughters will be on the same machine + int numReplicas = 1; + try { + numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()). + getRegionReplication(); + } catch (IOException e) { + LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() + + " due to " + e.getMessage() + ". The assignment of daughter replicas " + + "replicas will not be done"); + } + // unassign the old replicas + List parentRegion = new ArrayList(); + parentRegion.add(parentHri); + Map> currentAssign = + regionStates.getRegionAssignments(parentRegion); + Collection> c = currentAssign.values(); + for (List l : c) { + for (HRegionInfo h : l) { + if (!RegionReplicaUtil.isDefaultReplica(h)) { + LOG.debug("Unassigning parent's replica " + h); + unassign(h); + } + } + } + // assign daughter replicas + Map map = new HashMap(); + for (int i = 1; i < numReplicas; i++) { + prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map); + prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map); + } + try { + assign(map); + } catch (IOException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } catch (InterruptedException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } + } + + private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, + int replicaId, Map map) { + HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId); + HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri, + replicaId); + LOG.debug("Created replica region for daughter " + daughterReplica); + ServerName sn; + if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) { + map.put(daughterReplica, sn); + } else { + List servers = serverManager.getOnlineServersList(); + sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size())); + map.put(daughterReplica, sn); + } + } + + public Set getReplicasToClose() { + return replicasToClose; + } + /** * A region is offline. The new state should be the specified one, * if not null. If the specified state is null, the new state is Offline. @@ -3786,6 +3949,25 @@ public class AssignmentManager extends ZooKeeperListener { // Tell our listeners that a region was closed sendRegionClosedNotification(regionInfo); + // also note that all the replicas of the primary should be closed + if (state != null && state.equals(State.SPLIT)) { + Collection c = new ArrayList(1); + c.add(regionInfo); + Map> map = regionStates.getRegionAssignments(c); + Collection> allReplicas = map.values(); + for (List list : allReplicas) { + replicasToClose.addAll(list); + } + } + else if (state != null && state.equals(State.MERGED)) { + Collection c = new ArrayList(1); + c.add(regionInfo); + Map> map = regionStates.getRegionAssignments(c); + Collection> allReplicas = map.values(); + for (List list : allReplicas) { + replicasToClose.addAll(list); + } + } } private void sendRegionOpenedNotification(final HRegionInfo regionInfo, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 051002e..3007d5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -23,7 +23,11 @@ import java.io.IOException; import java.util.Comparator; import java.util.HashSet; import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -43,6 +47,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.util.Bytes; @@ -109,12 +114,13 @@ public class CatalogJanitor extends Chore { /** * Scans hbase:meta and returns a number of scanned rows, and a map of merged * regions, and an ordered map of split parents. - * @return triple of scanned rows, map of merged regions and map of split - * parent regioninfos + * @return triple of scanned rows, map of merged regions and a pair consisting of + * map of split parent regioninfos and the set of parents to which daughters still have + * back references to [TODO: the signature is ugly; fix it] * @throws IOException */ - Triple, Map> getMergedRegionsAndSplitParents() - throws IOException { + Triple, Pair, Set>> + getMergedRegionsAndSplitParents() throws IOException { return getMergedRegionsAndSplitParents(null); } @@ -124,11 +130,13 @@ public class CatalogJanitor extends Chore { * null, return merged regions and split parents of all tables, else only the * specified table * @param tableName null represents all tables - * @return triple of scanned rows, and map of merged regions, and map of split - * parent regioninfos + * @return triple of scanned rows, and map of merged regions, and a pair consisting of + * map of split parent regioninfos and the set of parents to which daughters still have + * back references to [TODO: the signature is ugly; fix it] * @throws IOException */ - Triple, Map> getMergedRegionsAndSplitParents( + Triple, Pair, Set>> + getMergedRegionsAndSplitParents( final TableName tableName) throws IOException { final boolean isTableSpecified = (tableName != null); // TODO: Only works with single hbase:meta region currently. Fix. @@ -138,6 +146,7 @@ public class CatalogJanitor extends Chore { final Map splitParents = new TreeMap(new SplitParentFirstComparator()); final Map mergedRegions = new TreeMap(); + final Set parentsReferenced = new HashSet(); // This visitor collects split parents and counts rows in the hbase:meta table MetaScannerVisitor visitor = new MetaScanner.MetaScannerVisitorBase() { @@ -152,6 +161,21 @@ public class CatalogJanitor extends Chore { // Another table, stop scanning return false; } + int replicaId = 0; + NavigableMap> familyMap = r.getNoVersionMap(); + NavigableMap infoMap = familyMap.get(HConstants.CATALOG_FAMILY); + byte[] parentsReferencedColumn = + MetaTableAccessor.getParentReplicaReferenceQualifier(replicaId); + SortedMap serverMap = infoMap.tailMap(parentsReferencedColumn, false); + if (!serverMap.isEmpty()) { + for (Entry entry : serverMap.entrySet()) { + HRegionInfo h = HRegionInfo.parseFromOrNull(entry.getValue()); + // the parent row should not be deleted + if (h != null) { + parentsReferenced.add(RegionReplicaUtil.getRegionInfoForDefaultReplica(h)); + } + } + } if (info.isSplitParent()) splitParents.put(info, r); if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) { mergedRegions.put(info, r); @@ -165,8 +189,9 @@ public class CatalogJanitor extends Chore { // the start row MetaScanner.metaScan(server.getConfiguration(), null, visitor, tableName); - return new Triple, Map>( - count.get(), mergedRegions, splitParents); + return new Triple, Pair, + Set>>(count.get(), mergedRegions, new Pair, + Set>(splitParents, parentsReferenced)); } /** @@ -216,8 +241,8 @@ public class CatalogJanitor extends Chore { if (!alreadyRunning.compareAndSet(false, true)) { return 0; } - Triple, Map> scanTriple = - getMergedRegionsAndSplitParents(); + Triple, Pair, Set>> + scanTriple = getMergedRegionsAndSplitParents(); int count = scanTriple.getFirst(); /** * clean merge regions first @@ -244,13 +269,17 @@ public class CatalogJanitor extends Chore { /** * clean split parents */ - Map splitParents = scanTriple.getThird(); + Map splitParents = scanTriple.getThird().getFirst(); + // Get the parents that are referenced from the daughter replicas, and don't + // delete the corresponding rows + Set parentsReferenced = scanTriple.getThird().getSecond(); // Now work on our list of found parents. See if any we can clean up. int splitCleaned = 0; // regions whose parents are still around HashSet parentNotCleaned = new HashSet(); for (Map.Entry e : splitParents.entrySet()) { + if (parentsReferenced != null && parentsReferenced.contains(e.getKey())) continue; if (!parentNotCleaned.contains(e.getKey().getEncodedName()) && cleanParent(e.getKey(), e.getValue())) { splitCleaned++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 8e1e040..598ded2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,8 +34,10 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.RegionState.State; @@ -185,7 +189,8 @@ public class RegionStateStore { try { int replicaId = hri.getReplicaId(); - Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri)); + byte[] metaRow = MetaTableAccessor.getMetaKeyForRegion(hri); + Put put = new Put(metaRow); StringBuilder info = new StringBuilder("Updating row "); info.append(hri.getRegionNameAsString()).append(" with state=").append(state); if (serverName != null && !serverName.equals(oldServer)) { @@ -202,6 +207,21 @@ public class RegionStateStore { } put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId), Bytes.toBytes(state.name())); + Collection mutations = null; + Collection rows = null; + Delete d = null; + if (replicaId != 0 && newState.isOpened() && !oldState.isOpened()) { + //potentially delete the referenced parent replica column + rows = new ArrayList(1); + mutations = new ArrayList(2); + d = new Delete(metaRow); + d.deleteColumn(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getParentReplicaReferenceQualifier(replicaId)); + mutations.add(put); + mutations.add(d); + rows.add(metaRow); + mutations.add(d); + } LOG.info(info); // Persist the state change to meta @@ -209,7 +229,8 @@ public class RegionStateStore { try { // Assume meta is pinned to master. // At least, that's what we want. - metaRegion.put(put); + if (rows != null) metaRegion.mutateRowsWithLocks(mutations, rows); + else metaRegion.put(put); return; // Done here } catch (Throwable t) { // In unit tests, meta could be moved away by intention @@ -227,7 +248,8 @@ public class RegionStateStore { } } synchronized(metaTable) { - metaTable.put(put); + if (rows != null) MetaTableAccessor.multiMutate(metaTable, metaRow, put, d); + else metaTable.put(put); } } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java index 3bb5220..b01434e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java @@ -93,7 +93,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName()); // Check if this table is being disabled or not if (this.assignmentManager.getTableStateManager().isTableState(this.regionInfo.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + assignmentManager.getReplicasToClose().contains(regionInfo)) { assignmentManager.offlineDisabledRegion(regionInfo); return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 607d042..f6d798a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -266,7 +266,8 @@ public class ServerShutdownHandler extends EventHandler { } else if (rit != null) { if (rit.isPendingCloseOrClosing() && am.getTableStateManager().isTableState(hri.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + am.getReplicasToClose().contains(hri)) { // If the table was partially disabled and the RS went down, we should clear the RIT // and remove the node for the region. // The rit that we use may be stale in case the table was in DISABLING state diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index c5b29e6..46e11e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -37,12 +37,16 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; @@ -489,31 +493,43 @@ public class SplitTransaction { HRegionInfo copyOfParent = new HRegionInfo(parent); copyOfParent.setOffline(true); copyOfParent.setSplit(true); + HTable meta = MetaTableAccessor.getMetaHTable(hConnection); //Put for parent Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent); MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB); mutations.add(putParent); + byte[] key = MetaTableAccessor.getMetaKeyForRegion(parent); + Result result = meta.get(new Get(key)); + RegionLocations rl = MetaTableAccessor.getRegionLocations(result); //Puts for daughters Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA); Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB); - addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine. - addLocation(putB, serverName, 1); + //these are new regions, openSeqNum = 1 is fine. + addLocation(putA, serverName, 1, 0); + addLocation(putB, serverName, 1, 0); + // set the replicas to point to the parent replicas + for (int i = 1; i < rl.size(); i++) { + byte[] parentHri; + if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica + continue; + } else { + parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray(); + } + putA.addImmutable(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri); + putB.addImmutable(HConstants.CATALOG_FAMILY, + MetaTableAccessor.getParentReplicaReferenceQualifier(i), parentHri); + } mutations.add(putA); mutations.add(putB); MetaTableAccessor.mutateMetaTable(hConnection, mutations); } - public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, - Bytes.toBytes(sn.getHostAndPort())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, - Bytes.toBytes(sn.getStartcode())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, - Bytes.toBytes(openSeqNum)); - return p; + public Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId) { + return MetaTableAccessor.addLocation(p, sn, openSeqNum, replicaId); } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 7975c51..618b03c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -149,6 +149,23 @@ public class StoreFileInfo implements Comparable { } /** + * Create a Store File Info from an HFileLink + * @param conf + * @param fs + * @param fileStatus + * @param link + * @throws IOException + */ + public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus, + final Reference reference) + throws IOException { + this.conf = conf; + this.fileStatus = fileStatus; + this.reference = reference; + this.link = null; + } + + /** * Sets the region coprocessor env. * @param coprocessorHost */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 237e316..74a74a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; @@ -83,6 +84,11 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { return new StoreFileInfo(conf, fs, status); } + if (StoreFileInfo.isReference(status.getPath())) { + Reference reference = Reference.read(fs, status.getPath()); + return new StoreFileInfo(conf, fs, status, reference); + } + // else create a store file link. The link file does not exists on filesystem though. HFileLink link = new HFileLink(conf, HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 1f1f3d4..bc3a1f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -92,7 +92,7 @@ public class TestReplicasClient { static final AtomicLong sleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); - static final AtomicReference cdl = + private static final AtomicReference cdl = new AtomicReference(new CountDownLatch(0)); Random r = new Random(); public SlowMeCopro() { @@ -129,7 +129,7 @@ public class TestReplicasClient { private void slowdownCode(final ObserverContext e) { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { - CountDownLatch latch = cdl.get(); + CountDownLatch latch = getCdl().get(); try { if (sleepTime.get() > 0) { LOG.info("Sleeping for " + sleepTime.get() + " ms"); @@ -148,6 +148,10 @@ public class TestReplicasClient { LOG.info("We're not the primary replicas."); } } + + public static AtomicReference getCdl() { + return cdl; + } } @BeforeClass @@ -291,7 +295,7 @@ public class TestReplicasClient { public void testUseRegionWithoutReplica() throws Exception { byte[] b1 = "testUseRegionWithoutReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(0)); + SlowMeCopro.getCdl().set(new CountDownLatch(0)); try { Get g = new Get(b1); Result r = table.get(g); @@ -347,14 +351,14 @@ public class TestReplicasClient { byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); try { Get g = new Get(b1); g.setConsistency(Consistency.TIMELINE); Result r = table.get(g); Assert.assertTrue(r.isStale()); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); closeRegion(hriSecondary); } } @@ -465,13 +469,13 @@ public class TestReplicasClient { LOG.info("sleep and is not stale done"); // But if we ask for stale we will get it - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); @@ -484,14 +488,14 @@ public class TestReplicasClient { LOG.info("exists not stale done"); // exists works on stale but don't see the put - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse("The secondary has stale data", r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale before flush done"); flushRegion(hriPrimary); @@ -500,28 +504,28 @@ public class TestReplicasClient { Thread.sleep(1000 + REFRESH_PERIOD * 2); // get works and is not stale - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse(r.isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); // exists works on stale and we see the put after the flush - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale after flush done"); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); Delete d = new Delete(b1); table.delete(d); @@ -587,7 +591,7 @@ public class TestReplicasClient { SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 288d115..2d19e72 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -26,8 +26,10 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -631,16 +634,20 @@ public class TestCatalogJanitor { new byte[0]); Thread.sleep(1001); + final Pair, Set> pair = + new Pair, Set>(); final Map splitParents = new TreeMap(new SplitParentFirstComparator()); splitParents.put(parent, createResult(parent, splita, splitb)); splita.setOffline(true); //simulate that splita goes offline when it is split splitParents.put(splita, createResult(splita, splitaa,splitab)); + pair.setFirst(splitParents); + pair.setSecond(null); final Map mergedRegions = new TreeMap(); CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); - doReturn(new Triple, Map>( - 10, mergedRegions, splitParents)).when(janitor) + doReturn(new Triple, Pair, Set>>( + 10, mergedRegions, pair)).when(janitor) .getMergedRegionsAndSplitParents(); //create ref from splita to parent @@ -824,6 +831,62 @@ public class TestCatalogJanitor { janitor.join(); } + + @Test + public void testCatalogJanitorOperationsWithReplicas() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + setRootDirAndCleanIt(htu, "testScanDoesNotCleanRegionsWithExistingParents"); + Server server = new MockServer(htu); + MasterServices services = new MockMasterServices(server); + + final HTableDescriptor htd = createHTableDescriptor(); + + // Create regions: aaa->{lastEndKey}, aaa->bbb, bbb->{lastEndKey} + + // Parent + HRegionInfo parent = new HRegionInfo(htd.getTableName(), Bytes.toBytes("aaa"), + new byte[0], true); + // Sleep a second else the encoded name on these regions comes out + // same for all with same start key and made in same second. + Thread.sleep(1001); + + // Daughter a + HRegionInfo splita = new HRegionInfo(htd.getTableName(), Bytes.toBytes("aaa"), + Bytes.toBytes("bbb"), true); + Thread.sleep(1001); + + // Daughter b + HRegionInfo splitb = new HRegionInfo(htd.getTableName(), Bytes.toBytes("bbb"), + new byte[0]); + Thread.sleep(1001); + + final Pair, Set> pair = + new Pair, Set>(); + final Map splitParents = + new TreeMap(new SplitParentFirstComparator()); + splitParents.put(parent, createResult(parent, splita, splitb)); + pair.setFirst(splitParents); + Set referencedParents = new HashSet(); + referencedParents.add(parent); + pair.setSecond(referencedParents); + + final Map mergedRegions = new TreeMap(); + CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); + doReturn(new Triple, Pair, Set>>( + 10, mergedRegions, pair)).when(janitor) + .getMergedRegionsAndSplitParents(); + + //since parent is referenced, it will not delete the row + assertEquals(0, janitor.scan()); + + pair.setSecond(null); + //now, there are no references to parent, and the janitor can cleanup + assertEquals(1, janitor.scan()); + + services.stop("test finished"); + janitor.join(); + } + /** * @param description description of the files for logging * @param storeFiles the status of the files to log diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index 6981415..0b811ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.math.RandomUtils; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -287,6 +289,45 @@ public class TestRegionMergeTransactionOnCluster { } } + @Test + public void testMergeWithReplicas() throws Exception { + final TableName tableName = TableName.valueOf("testMergeWithReplicas"); + // Create table and load data. + createTableAndLoadData(master, tableName, 5, 2); + List> initialRegionToServers = + MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(), + master.getShortCircuitConnection(), tableName); + // Merge 1st and 2nd region + PairOfSameType mergedRegions = mergeRegionsAndVerifyRegionNum(master, tableName, + 0, 2, 5 * 2 - 2); + List> currentRegionToServers = + MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(), + master.getShortCircuitConnection(), tableName); + List initialRegions = new ArrayList(); + for (Pair p : initialRegionToServers) { + initialRegions.add(p.getFirst()); + } + List currentRegions = new ArrayList(); + for (Pair p : currentRegionToServers) { + currentRegions.add(p.getFirst()); + } + assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is the first region + assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getFirst(), 1))); //this is the replica of the first region + assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is the second region + assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getSecond(), 1))); //this is the replica of the second region + assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the new region + assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + currentRegions.get(0), 1))); //replica of the new region + assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + currentRegions.get(0), 1))); //replica of the new region + assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getFirst(), 1))); //replica of the merged region + assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getSecond(), 1))); //replica of the merged region + } + private PairOfSameType mergeRegionsAndVerifyRegionNum( HMaster master, TableName tablename, int regionAnum, int regionBnum, int expectedRegionNum) throws Exception { @@ -335,11 +376,11 @@ public class TestRegionMergeTransactionOnCluster { private HTable createTableAndLoadData(HMaster master, TableName tablename) throws Exception { - return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM); + return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1); } private HTable createTableAndLoadData(HMaster master, TableName tablename, - int numRegions) throws Exception { + int numRegions, int replication) throws Exception { assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions); byte[][] splitRows = new byte[numRegions - 1][]; for (int i = 0; i < splitRows.length; i++) { @@ -347,6 +388,9 @@ public class TestRegionMergeTransactionOnCluster { } HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows); + if (replication > 1) { + HBaseTestingUtility.setReplicas(admin, tablename, replication); + } loadData(table); verifyRowCount(table, ROWSIZE); @@ -356,7 +400,7 @@ public class TestRegionMergeTransactionOnCluster { while (System.currentTimeMillis() < timeout) { tableRegions = MetaTableAccessor.getTableRegionsAndLocations( master.getZooKeeper(), master.getShortCircuitConnection(), tablename); - if (tableRegions.size() == numRegions) + if (tableRegions.size() == numRegions * replication) break; Thread.sleep(250); } @@ -364,7 +408,7 @@ public class TestRegionMergeTransactionOnCluster { tableRegions = MetaTableAccessor.getTableRegionsAndLocations( master.getZooKeeper(), master.getShortCircuitConnection(), tablename); LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions)); - assertEquals(numRegions, tableRegions.size()); + assertEquals(numRegions * replication, tableRegions.size()); return table; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index b7626e9..2fbfe3b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -28,8 +28,10 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; @@ -38,6 +40,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseIOException; @@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; @@ -57,7 +62,10 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -65,6 +73,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro; import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination; import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; @@ -80,6 +89,7 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -128,6 +138,8 @@ public class TestSplitTransactionOnCluster { TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000); useZKForAssignment = TESTING_UTIL.getConfiguration().getBoolean( "hbase.assignment.usezk", false); + TESTING_UTIL.getConfiguration().set("hbase.coprocessor.region.classes", + MetaUpdateObserver.class.getName()); TESTING_UTIL.startMiniCluster(NB_SERVERS); } @@ -924,6 +936,107 @@ public class TestSplitTransactionOnCluster { } } + @Test + public void testSplitWithRegionReplicas() throws Exception { + ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL); + final TableName tableName = + TableName.valueOf("foobar"); + HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar"); + htd.setRegionReplication(2); + htd.addCoprocessor(SlowMeCopro.class.getName()); + // Create table then get the single region for our new table. + HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")}, + TESTING_UTIL.getConfiguration()); + int count; + do { + count = cluster.getRegions(tableName).size(); + } while (count != 2); + List regions = null; + try { + regions = cluster.getRegions(tableName); + int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName()); + HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); + insertData(tableName.getName(), admin, t); + // Turn off balancer so it doesn't cut in and mess up our placements. + admin.setBalancerRunning(false, true); + // Turn off the meta scanner so it don't remove parent on us. + cluster.getMaster().setCatalogJanitorEnabled(false); + boolean tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(), + tableName); + assertEquals("The specified table should be present.", true, tableExists); + final HRegion region = findSplittableRegion(regions); + regionServerIndex = cluster.getServerWith(region.getRegionName()); + regionServer = cluster.getRegionServer(regionServerIndex); + assertTrue("not able to find a splittable region", region != null); + String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), + region.getRegionInfo().getEncodedName()); + regionServer.getZooKeeper().sync(node); + SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + MetaUpdateObserver.deleteCellCount = 0; + try { + st.prepare(); + MetaUpdateObserver.shouldRunChecks = true; + st.execute(regionServer, regionServer); + } catch (IOException e) { + e.printStackTrace(); + fail("Split execution should have succeeded with no exceptions thrown " + e); + } + TESTING_UTIL.waitUntilAllRegionsAssigned(tableName); + tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(), + tableName); + assertEquals("The specified table should be present.", true, tableExists); + // exists works on stale and we see the put after the flush + byte[] b1 = "row1".getBytes(); + Get g = new Get(b1); + g.setConsistency(Consistency.STRONG); + // The following GET will make a trip to the meta to get the new location of the 1st daughter + // In the process it will also get the location of the replica of the daughter (initially + // pointing to the parent's replica) + Result r = t.get(g); + Assert.assertFalse(r.isStale()); + LOG.info("exists stale after flush done"); + + SlowMeCopro.getCdl().set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + // This will succeed because in the previous GET we get the location of the replica + r = t.get(g); + Assert.assertTrue(r.isStale()); + SlowMeCopro.getCdl().get().countDown(); + // Ensure that we did the right meta updates for the 'daughter' columns + // We would have done a couple of updates. We would have added the 'daughter_0001' + // column to the daughter rows, back referencing the parent replicas. + // We would have deleted the daughter_0001 column subsequently. + // The deleteCellCount is 2 because the delete would be done + // for all replica location updates in the meta (for the two + // replicas of the daughters) + LOG.debug("DeleteCellCount " + MetaUpdateObserver.deleteCellCount + " PutCellCount " + + MetaUpdateObserver.putCellCount + " "); + assert(MetaUpdateObserver.deleteCellCount == 2); + assert(MetaUpdateObserver.putCellCount == 2); + assert(!Arrays.equals(MetaUpdateObserver.rowsInPut[0], + MetaUpdateObserver.rowsInPut[1])); + assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[0], + MetaUpdateObserver.rowsInDelete[1])); + assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[1], + MetaUpdateObserver.rowsInDelete[2])); + assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[0], + MetaUpdateObserver.rowsInDelete[2])); + //assert(false); + } finally { + MetaUpdateObserver.shouldRunChecks = false; + SlowMeCopro.getCdl().get().countDown(); + if (regions != null) { + String node = ZKAssign.getNodeName(zkw, regions.get(0).getRegionInfo() + .getEncodedName()); + ZKUtil.deleteNodeFailSilent(zkw, node); + } + admin.setBalancerRunning(true, false); + cluster.getMaster().setCatalogJanitorEnabled(true); + t.close(); + } + } + private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException, InterruptedException { Put p = new Put(Bytes.toBytes("row1")); @@ -1192,7 +1305,7 @@ public class TestSplitTransactionOnCluster { private HRegion findSplittableRegion(final List regions) throws InterruptedException { for (int i = 0; i < 5; ++i) { for (HRegion r: regions) { - if (r.isSplittable()) { + if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) { return(r); } } @@ -1388,6 +1501,51 @@ public class TestSplitTransactionOnCluster { } } + public static class MetaUpdateObserver extends BaseRegionObserver { + static boolean shouldRunChecks = false; + static int deleteCellCount; + static int putCellCount; + static byte[][] rowsInPut = new byte[10][]; + static byte[][] rowsInDelete = new byte[10][]; + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + if (e.getEnvironment().getRegion().getTableDesc().getTableName(). + equals(TableName.META_TABLE_NAME) && shouldRunChecks) { + NavigableMap> map = delete.getFamilyCellMap(); + List cells = map.get(HConstants.CATALOG_FAMILY); + if (cells == null) return; + for (Cell c : cells) { + if (c.getTypeByte() == KeyValue.Type.Delete.getCode() && + Arrays.equals(CellUtil.cloneQualifier(c), + MetaTableAccessor.getParentReplicaReferenceQualifier(1))) { + rowsInDelete[deleteCellCount++] = CellUtil.cloneRow(c); + } + } + e.bypass(); + } + } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) { + if (e.getEnvironment().getRegion().getTableDesc().getTableName(). + equals(TableName.META_TABLE_NAME) && shouldRunChecks) { + NavigableMap> map = put.getFamilyCellMap(); + List cells = map.get(HConstants.CATALOG_FAMILY); + if (cells == null) return; + for (Cell c : cells) { + if (c.getTypeByte() == KeyValue.Type.Put.getCode()) { + if (Arrays.equals(CellUtil.cloneQualifier(c), + MetaTableAccessor.getParentReplicaReferenceQualifier(1))) { + rowsInPut[putCellCount++] = CellUtil.cloneRow(c); + } + } + } + } + } + } + public static class MockedRegionObserver extends BaseRegionObserver { private SplitTransaction st = null; private PairOfSameType daughterRegions = null; @@ -1428,8 +1586,8 @@ public class TestSplitTransactionOnCluster { daughterRegions.getFirst().getRegionInfo()); Put putB = MetaTableAccessor.makePutFromRegionInfo( daughterRegions.getSecond().getRegionInfo()); - st.addLocation(putA, rs.getServerName(), 1); - st.addLocation(putB, rs.getServerName(), 1); + st.addLocation(putA, rs.getServerName(), 1, 0); + st.addLocation(putB, rs.getServerName(), 1, 0); metaEntries.add(putA); metaEntries.add(putB); }