commit 8b4e774b654f152f73acd46a83e7a71d760c9730 Author: Devaraj Das Date: Tue Jul 1 14:21:29 2014 -0700 HBASE-11261. Handle splitting of regions that have region_replication greater than one. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java index 9517113..093e2e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java @@ -187,7 +187,7 @@ public class MetaReader { * @return An {@link HTable} for hbase:meta * @throws IOException */ - static HTable getMetaHTable(final CatalogTracker ct) + public static HTable getMetaHTable(final CatalogTracker ct) throws IOException { return getHTable(ct, TableName.META_TABLE_NAME); } @@ -613,7 +613,7 @@ public class MetaReader { * Returns the column family used for meta columns. * @return HConstants.CATALOG_FAMILY. */ - protected static byte[] getFamily() { + public static byte[] getFamily() { return HConstants.CATALOG_FAMILY; } @@ -665,6 +665,19 @@ public class MetaReader { } /** + * Returns the column qualifier for daughter column for replicaId + * @param replicaId the replicaId of the region + * @return a byte[] for daughter column qualifier + */ + @VisibleForTesting + public static byte[] getDaughterReplicaQualifier(int replicaId) { + return replicaId == 0 + ? HConstants.DAUGHTER_QUALIFIER + : Bytes.toBytes(HConstants.DAUGHTER_QUALIFIER_STR + META_REPLICA_ID_DELIMITER + + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId)); + } + + /** * Parses the replicaId from the server column qualifier. See top of the class javadoc * for the actual meta layout * @param serverColumn the column qualifier @@ -745,8 +758,16 @@ public class MetaReader { if (replicaId < 0) { break; } + byte[] daughter = getDaughterReplicaQualifier(replicaId); + // if we have a daughter_ column the value of that is the + // hri of the (split) parent replica. Let's use that as the location of the + // daughter replica until the daughter replica is actually created and assigned + HRegionInfo h = getHRegionInfo(r, daughter); + if (h == null) { + h = regionInfo; + } - locations.add(getRegionLocation(r, regionInfo, replicaId)); + locations.add(getRegionLocation(r, h, replicaId)); } return new RegionLocations(locations); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c2709f5..ce22915 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -425,6 +425,11 @@ public final class HConstants { public static final byte [] SERVERNAME_QUALIFIER = Bytes.toBytes(SERVERNAME_QUALIFIER_STR); + /** The daughter qualifier */ + public static final String DAUGHTER_QUALIFIER_STR = "daughter"; + /** The daughter qualifier */ + public static final byte [] DAUGHTER_QUALIFIER = Bytes.toBytes(DAUGHTER_QUALIFIER_STR); + /** The lower-half split region column qualifier */ public static final byte [] SPLITA_QUALIFIER = Bytes.toBytes("splitA"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index 191251c..28c55e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -30,12 +30,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; @@ -383,6 +386,9 @@ public class MetaEditor extends MetaReader { HRegionInfo copyOfParent = new HRegionInfo(parent); copyOfParent.setOffline(true); copyOfParent.setSplit(true); + byte[] key = MetaReader.getMetaKeyForRegion(parent); + Result result = meta.get(new Get(key)); + RegionLocations rl = MetaReader.getRegionLocations(result); //Put for parent Put putParent = makePutFromRegionInfo(copyOfParent); @@ -394,6 +400,22 @@ public class MetaEditor extends MetaReader { addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine. addLocation(putB, sn, 1, splitB.getReplicaId()); + // bootstrap the daughter replicas to point to the locations of the old parent replicas + // via a column daughter_ + for (int i = 1; i < rl.size(); i++) { + ServerName s; + byte[] parentHri; + if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica + continue; + } else { + s = rl.getRegionLocation(i).getServerName(); + parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray(); + } + addLocation(putA, s, rl.getRegionLocation(i).getSeqNum(), i); + addLocation(putB, s, rl.getRegionLocation(i).getSeqNum(), i); + putA.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + putB.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + } byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); multiMutate(meta, tableRow, putParent, putA, putB); @@ -463,9 +485,20 @@ public class MetaEditor extends MetaReader { HRegionInfo regionInfo, ServerName sn, long openSeqNum) throws IOException { // region replicas are kept in the primary region's row - Put put = new Put(getMetaKeyForRegion(regionInfo)); + byte[] metaRow = getMetaKeyForRegion(regionInfo); + Put put = new Put(metaRow); addLocation(put, sn, openSeqNum, regionInfo.getReplicaId()); - putToCatalogTable(catalogTracker, put); + put.addImmutable(HConstants.CATALOG_FAMILY, + MetaReader.getServerColumn(regionInfo.getReplicaId()), + Bytes.toBytes(sn.getHostAndPort())); + if (regionInfo.getReplicaId() != 0) { + Delete d = new Delete(metaRow); + d.deleteColumn(HConstants.CATALOG_FAMILY, + MetaReader.getDaughterReplicaQualifier(regionInfo.getReplicaId())); + multiMutate(MetaReader.getMetaHTable(catalogTracker), metaRow, put, d); + } else { + putToCatalogTable(catalogTracker, put); + } LOG.info("Updated row " + regionInfo.getRegionNameAsString() + " with server=" + sn); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index f407569..368327f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -3737,6 +3738,13 @@ public class AssignmentManager extends ZooKeeperListener { } if (et == EventType.RS_ZK_REGION_SPLIT) { + // split replicas + try { + doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b, + ((MasterServices)server).getTableDescriptors().get(p.getTable()).getRegionReplication()); + } catch (IOException e) { + LOG.warn("Failed to handle splits for replica regions " + e); + } LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node"); // Remove region from ZK try { @@ -3769,6 +3777,47 @@ public class AssignmentManager extends ZooKeeperListener { return true; } + private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, + final HRegionInfo hri_b, final int numReplicas) { + // create new regions for the replica, and assign them to match with the + // current replica assignments. If replica1 of parent is assigned to RS1, + // the replica1s of daughters will be on the same machine + Map map = new HashMap(); + for (int i = 1; i < numReplicas; i++) { + prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map); + prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map); + } + try { + assign(map); + } catch (IOException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } catch (InterruptedException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } + // unassign the old replicas + for (int i = 1; i < numReplicas; i++) { + HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(parentHri, i); + LOG.debug("Unassigning replica for split parent " + h); + unassign(RegionReplicaUtil.getRegionInfoForReplica(parentHri, i)); + } + } + + private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, + int replicaId, Map map) { + HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId); + HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri, + replicaId); + LOG.debug("Created replica region for daughter " + daughterReplica); + ServerName sn; + if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) { + map.put(daughterReplica, sn); + } else { + List servers = serverManager.getOnlineServersList(); + sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size())); + map.put(daughterReplica, sn); + } + } + /** * A region is offline. The new state should be the specified one, * if not null. If the specified state is null, the new state is Offline. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 59bc01e..0207029 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -23,7 +23,11 @@ import java.io.IOException; import java.util.Comparator; import java.util.HashSet; import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.util.Bytes; @@ -110,12 +115,13 @@ public class CatalogJanitor extends Chore { /** * Scans hbase:meta and returns a number of scanned rows, and a map of merged * regions, and an ordered map of split parents. - * @return triple of scanned rows, map of merged regions and map of split - * parent regioninfos + * @return triple of scanned rows, map of merged regions and a pair consisting of + * map of split parent regioninfos and the set of parents to which daughters still have + * back references to [TODO: the signature is ugly; fix it] * @throws IOException */ - Triple, Map> getMergedRegionsAndSplitParents() - throws IOException { + Triple, Pair, Set>> + getMergedRegionsAndSplitParents() throws IOException { return getMergedRegionsAndSplitParents(null); } @@ -125,11 +131,13 @@ public class CatalogJanitor extends Chore { * null, return merged regions and split parents of all tables, else only the * specified table * @param tableName null represents all tables - * @return triple of scanned rows, and map of merged regions, and map of split - * parent regioninfos + * @return triple of scanned rows, and map of merged regions, and a pair consisting of + * map of split parent regioninfos and the set of parents to which daughters still have + * back references to [TODO: the signature is ugly; fix it] * @throws IOException */ - Triple, Map> getMergedRegionsAndSplitParents( + Triple, Pair, Set>> + getMergedRegionsAndSplitParents( final TableName tableName) throws IOException { final boolean isTableSpecified = (tableName != null); // TODO: Only works with single hbase:meta region currently. Fix. @@ -139,6 +147,7 @@ public class CatalogJanitor extends Chore { final Map splitParents = new TreeMap(new SplitParentFirstComparator()); final Map mergedRegions = new TreeMap(); + final Set parentsReferenced = new HashSet(); // This visitor collects split parents and counts rows in the hbase:meta table MetaScannerVisitor visitor = new MetaScanner.MetaScannerVisitorBase() { @@ -153,6 +162,18 @@ public class CatalogJanitor extends Chore { // Another table, stop scanning return false; } + int replicaId = 0; + NavigableMap> familyMap = r.getNoVersionMap(); + NavigableMap infoMap = familyMap.get(MetaReader.getFamily()); + byte[] daughterColumn = MetaReader.getDaughterReplicaQualifier(replicaId); + SortedMap serverMap = infoMap.tailMap(daughterColumn, false); + if (!serverMap.isEmpty()) { + for (Entry entry : serverMap.entrySet()) { + HRegionInfo h = HRegionInfo.parseFromOrNull(entry.getValue()); + // the parent row should not be deleted + parentsReferenced.add(RegionReplicaUtil.getRegionInfoForDefaultReplica(h)); + } + } if (info.isSplitParent()) splitParents.put(info, r); if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) { mergedRegions.put(info, r); @@ -166,8 +187,9 @@ public class CatalogJanitor extends Chore { // the start row MetaScanner.metaScan(server.getConfiguration(), null, visitor, tableName); - return new Triple, Map>( - count.get(), mergedRegions, splitParents); + return new Triple, Pair, + Set>>(count.get(), mergedRegions, new Pair, + Set>(splitParents, parentsReferenced)); } /** @@ -216,8 +238,8 @@ public class CatalogJanitor extends Chore { if (!alreadyRunning.compareAndSet(false, true)) { return 0; } - Triple, Map> scanTriple = - getMergedRegionsAndSplitParents(); + Triple, Pair, Set>> + scanTriple = getMergedRegionsAndSplitParents(); int count = scanTriple.getFirst(); /** * clean merge regions first @@ -244,13 +266,17 @@ public class CatalogJanitor extends Chore { /** * clean split parents */ - Map splitParents = scanTriple.getThird(); + Map splitParents = scanTriple.getThird().getFirst(); + // Get the parents that are referenced from the daughter replicas, and don't + // delete the corresponding rows + Set parentsReferenced = scanTriple.getThird().getSecond(); // Now work on our list of found parents. See if any we can clean up. int splitCleaned = 0; // regions whose parents are still around HashSet parentNotCleaned = new HashSet(); for (Map.Entry e : splitParents.entrySet()) { + if (parentsReferenced != null && parentsReferenced.contains(e.getKey())) continue; if (!parentNotCleaned.contains(e.getKey().getEncodedName()) && cleanParent(e.getKey(), e.getValue())) { splitCleaned++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index e8754d6..651ebf1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -37,12 +37,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; @@ -490,30 +495,44 @@ public class SplitTransaction { copyOfParent.setOffline(true); copyOfParent.setSplit(true); + HTable meta = MetaReader.getMetaHTable(catalogTracker); //Put for parent Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent); MetaEditor.addDaughtersToPut(putParent, splitA, splitB); mutations.add(putParent); + byte[] key = MetaReader.getMetaKeyForRegion(parent); + Result result = meta.get(new Get(key)); + RegionLocations rl = MetaReader.getRegionLocations(result); //Puts for daughters Put putA = MetaEditor.makePutFromRegionInfo(splitA); Put putB = MetaEditor.makePutFromRegionInfo(splitB); - addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine. - addLocation(putB, serverName, 1); + //these are new regions, openSeqNum = 1 is fine. + addLocation(putA, serverName, 1, 0); + addLocation(putB, serverName, 1, 0); + // set the replicas to point to the locations of the old replicas + for (int i = 1; i < rl.size(); i++) { + ServerName s; + byte[] parentHri; + if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica + continue; + } else { + s = rl.getRegionLocation(i).getServerName(); + parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray(); + } + addLocation(putA, s, rl.getRegionLocation(i).getSeqNum(), i); + addLocation(putB, s, rl.getRegionLocation(i).getSeqNum(), i); + putA.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + putB.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + } mutations.add(putA); mutations.add(putB); MetaEditor.mutateMetaTable(catalogTracker, mutations); } - public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, - Bytes.toBytes(sn.getHostAndPort())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, - Bytes.toBytes(sn.getStartcode())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, - Bytes.toBytes(openSeqNum)); - return p; + public Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId) { + return MetaEditor.addLocation(p, sn, openSeqNum, replicaId); } /* diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 1f1f3d4..bc3a1f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -92,7 +92,7 @@ public class TestReplicasClient { static final AtomicLong sleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); - static final AtomicReference cdl = + private static final AtomicReference cdl = new AtomicReference(new CountDownLatch(0)); Random r = new Random(); public SlowMeCopro() { @@ -129,7 +129,7 @@ public class TestReplicasClient { private void slowdownCode(final ObserverContext e) { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { - CountDownLatch latch = cdl.get(); + CountDownLatch latch = getCdl().get(); try { if (sleepTime.get() > 0) { LOG.info("Sleeping for " + sleepTime.get() + " ms"); @@ -148,6 +148,10 @@ public class TestReplicasClient { LOG.info("We're not the primary replicas."); } } + + public static AtomicReference getCdl() { + return cdl; + } } @BeforeClass @@ -291,7 +295,7 @@ public class TestReplicasClient { public void testUseRegionWithoutReplica() throws Exception { byte[] b1 = "testUseRegionWithoutReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(0)); + SlowMeCopro.getCdl().set(new CountDownLatch(0)); try { Get g = new Get(b1); Result r = table.get(g); @@ -347,14 +351,14 @@ public class TestReplicasClient { byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); try { Get g = new Get(b1); g.setConsistency(Consistency.TIMELINE); Result r = table.get(g); Assert.assertTrue(r.isStale()); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); closeRegion(hriSecondary); } } @@ -465,13 +469,13 @@ public class TestReplicasClient { LOG.info("sleep and is not stale done"); // But if we ask for stale we will get it - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); @@ -484,14 +488,14 @@ public class TestReplicasClient { LOG.info("exists not stale done"); // exists works on stale but don't see the put - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse("The secondary has stale data", r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale before flush done"); flushRegion(hriPrimary); @@ -500,28 +504,28 @@ public class TestReplicasClient { Thread.sleep(1000 + REFRESH_PERIOD * 2); // get works and is not stale - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse(r.isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); // exists works on stale and we see the put after the flush - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale after flush done"); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); Delete d = new Delete(b1); table.delete(d); @@ -587,7 +591,7 @@ public class TestReplicasClient { SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 3b69c16..01ec066 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -26,8 +26,10 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -74,6 +76,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; @@ -628,16 +631,20 @@ public class TestCatalogJanitor { new byte[0]); Thread.sleep(1001); + final Pair, Set> pair = + new Pair, Set>(); final Map splitParents = new TreeMap(new SplitParentFirstComparator()); splitParents.put(parent, createResult(parent, splita, splitb)); splita.setOffline(true); //simulate that splita goes offline when it is split splitParents.put(splita, createResult(splita, splitaa,splitab)); + pair.setFirst(splitParents); + pair.setSecond(null); final Map mergedRegions = new TreeMap(); CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); - doReturn(new Triple, Map>( - 10, mergedRegions, splitParents)).when(janitor) + doReturn(new Triple, Pair, Set>>( + 10, mergedRegions, pair)).when(janitor) .getMergedRegionsAndSplitParents(); //create ref from splita to parent @@ -821,6 +828,62 @@ public class TestCatalogJanitor { janitor.join(); } + + @Test + public void testCatalogJanitorOperationsWithReplicas() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + setRootDirAndCleanIt(htu, "testScanDoesNotCleanRegionsWithExistingParents"); + Server server = new MockServer(htu); + MasterServices services = new MockMasterServices(server); + + final HTableDescriptor htd = createHTableDescriptor(); + + // Create regions: aaa->{lastEndKey}, aaa->bbb, bbb->{lastEndKey} + + // Parent + HRegionInfo parent = new HRegionInfo(htd.getTableName(), Bytes.toBytes("aaa"), + new byte[0], true); + // Sleep a second else the encoded name on these regions comes out + // same for all with same start key and made in same second. + Thread.sleep(1001); + + // Daughter a + HRegionInfo splita = new HRegionInfo(htd.getTableName(), Bytes.toBytes("aaa"), + Bytes.toBytes("bbb"), true); + Thread.sleep(1001); + + // Daughter b + HRegionInfo splitb = new HRegionInfo(htd.getTableName(), Bytes.toBytes("bbb"), + new byte[0]); + Thread.sleep(1001); + + final Pair, Set> pair = + new Pair, Set>(); + final Map splitParents = + new TreeMap(new SplitParentFirstComparator()); + splitParents.put(parent, createResult(parent, splita, splitb)); + pair.setFirst(splitParents); + Set referencedParents = new HashSet(); + referencedParents.add(parent); + pair.setSecond(referencedParents); + + final Map mergedRegions = new TreeMap(); + CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); + doReturn(new Triple, Pair, Set>>( + 10, mergedRegions, pair)).when(janitor) + .getMergedRegionsAndSplitParents(); + + //since parent is referenced, it will not delete the row + assertEquals(0, janitor.scan()); + + pair.setSecond(null); + //now, there are no references to parent, and the janitor can cleanup + assertEquals(1, janitor.scan()); + + services.stop("test finished"); + janitor.join(); + } + /** * @param description description of the files for logging * @param storeFiles the status of the files to log diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index b4a780f..4008bc7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -28,8 +28,10 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.io.InterruptedIOException; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; @@ -38,6 +40,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseIOException; @@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; @@ -56,9 +61,13 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -66,6 +75,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro; import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination; import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; @@ -81,6 +91,7 @@ import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -129,6 +140,8 @@ public class TestSplitTransactionOnCluster { TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000); useZKForAssignment = TESTING_UTIL.getConfiguration().getBoolean( "hbase.assignment.usezk", false); + TESTING_UTIL.getConfiguration().set("hbase.coprocessor.region.classes", + MetaUpdateObserver.class.getName()); TESTING_UTIL.startMiniCluster(NB_SERVERS); } @@ -925,6 +938,105 @@ public class TestSplitTransactionOnCluster { } } + @Test + public void testSplitWithRegionReplicas() throws Exception { + ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL); + final TableName tableName = + TableName.valueOf("foobar"); + HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar"); + htd.setRegionReplication(2); + htd.addCoprocessor(SlowMeCopro.class.getName()); + // Create table then get the single region for our new table. + HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")}, + TESTING_UTIL.getConfiguration()); + TESTING_UTIL.waitUntilAllRegionsAssigned(tableName); + List regions = null; + try { + regions = cluster.getRegions(tableName); + int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName()); + HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); + insertData(tableName.getName(), admin, t); + // Turn off balancer so it doesn't cut in and mess up our placements. + admin.setBalancerRunning(false, true); + // Turn off the meta scanner so it don't remove parent on us. + cluster.getMaster().setCatalogJanitorEnabled(false); + boolean tableExists = MetaReader.tableExists(regionServer.getCatalogTracker(), + tableName); + assertEquals("The specified table should be present.", true, tableExists); + final HRegion region = findSplittableRegion(regions); + regionServerIndex = cluster.getServerWith(region.getRegionName()); + regionServer = cluster.getRegionServer(regionServerIndex); + assertTrue("not able to find a splittable region", region != null); + String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), + region.getRegionInfo().getEncodedName()); + regionServer.getZooKeeper().sync(node); + LOG.debug("CURRENT REGION COUNT " + TESTING_UTIL.getHBaseAdmin().getClusterStatus().getRegionsCount()); + SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + try { + st.prepare(); + MetaUpdateObserver.shouldRunChecks = true; + st.execute(regionServer, regionServer); + } catch (IOException e) { + e.printStackTrace(); + fail("Split execution should have succeeded with no exceptions thrown " + e); + } + TESTING_UTIL.waitUntilAllRegionsAssigned(tableName); + LOG.debug("CURRENT REGION COUNT " + TESTING_UTIL.getHBaseAdmin().getClusterStatus().getRegionsCount()); + tableExists = MetaReader.tableExists(regionServer.getCatalogTracker(), + tableName); + assertEquals("The specified table should be present.", true, tableExists); + // exists works on stale and we see the put after the flush + byte[] b1 = "row1".getBytes(); + Get g = new Get(b1); + g.setConsistency(Consistency.STRONG); + // The following GET will make a trip to the meta to get the new location of the 1st daughter + // In the process it will also get the location of the replica of the daughter (initially + // pointing to the parent's replica) + Result r = t.get(g); + Assert.assertFalse(r.isStale()); + LOG.info("exists stale after flush done"); + MetaReader.fullScanMetaAndPrint(new CatalogTracker(TESTING_UTIL.getConfiguration())); + + SlowMeCopro.getCdl().set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + // This will succeed because in the previous GET we get the location of the replica + r = t.get(g); + Assert.assertTrue(r.isStale()); + SlowMeCopro.getCdl().get().countDown(); + // Ensure that we did the right meta updates for the 'daughter' columns + // We would have done a couple of updates. We would have added the 'daughter_0001' + // column to the daughter rows, back referencing the parent replicas. + // We would have deleted the daughter_0001 column subsequently. + // The deleteCellCount is 3 (as opposed to 2) because the delete would be done + // for all replica location updates in the meta (in this case we have three replica + // regions that we try to assign - the replica of the original parent, and the two + // replicas of the daughters) + assert(MetaUpdateObserver.deleteCellCount == 3); + assert(MetaUpdateObserver.putCellCount == 2); + assert(!Arrays.equals(MetaUpdateObserver.rowsInPut[0], + MetaUpdateObserver.rowsInPut[1])); + assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[0], + MetaUpdateObserver.rowsInDelete[1])); + assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[1], + MetaUpdateObserver.rowsInDelete[2])); + assert(!Arrays.equals(MetaUpdateObserver.rowsInDelete[0], + MetaUpdateObserver.rowsInDelete[2])); + //assert(false); + } finally { + MetaUpdateObserver.shouldRunChecks = false; + SlowMeCopro.getCdl().get().countDown(); + if (regions != null) { + String node = ZKAssign.getNodeName(zkw, regions.get(0).getRegionInfo() + .getEncodedName()); + ZKUtil.deleteNodeFailSilent(zkw, node); + } + admin.setBalancerRunning(true, false); + cluster.getMaster().setCatalogJanitorEnabled(true); + t.close(); + } + } + private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException, InterruptedException { Put p = new Put(Bytes.toBytes("row1")); @@ -1193,7 +1305,7 @@ public class TestSplitTransactionOnCluster { private HRegion findSplittableRegion(final List regions) throws InterruptedException { for (int i = 0; i < 5; ++i) { for (HRegion r: regions) { - if (r.isSplittable()) { + if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) { return(r); } } @@ -1389,6 +1501,50 @@ public class TestSplitTransactionOnCluster { } } + public static class MetaUpdateObserver extends BaseRegionObserver { + static boolean shouldRunChecks = false; + static int deleteCellCount; + static int putCellCount; + static byte[][] rowsInPut = new byte[10][]; + static byte[][] rowsInDelete = new byte[10][]; + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + if (e.getEnvironment().getRegion().getTableDesc().getTableName(). + equals(TableName.META_TABLE_NAME) && shouldRunChecks) { + NavigableMap> map = delete.getFamilyCellMap(); + List cells = map.get(HConstants.CATALOG_FAMILY); + if (cells == null) return; + for (Cell c : cells) { + if (c.getTypeByte() == KeyValue.Type.Delete.getCode() && + Arrays.equals(CellUtil.cloneQualifier(c), + MetaEditor.getDaughterReplicaQualifier(1))) { + rowsInDelete[deleteCellCount++] = CellUtil.cloneRow(c); + } + } + } + } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) { + if (e.getEnvironment().getRegion().getTableDesc().getTableName(). + equals(TableName.META_TABLE_NAME) && shouldRunChecks) { + NavigableMap> map = put.getFamilyCellMap(); + List cells = map.get(HConstants.CATALOG_FAMILY); + if (cells == null) return; + for (Cell c : cells) { + if (c.getTypeByte() == KeyValue.Type.Put.getCode()) { + if (Arrays.equals(CellUtil.cloneQualifier(c), + MetaEditor.getDaughterReplicaQualifier(1))) { + rowsInPut[putCellCount++] = CellUtil.cloneRow(c); + } + } + } + } + } + } + public static class MockedRegionObserver extends BaseRegionObserver { private SplitTransaction st = null; private PairOfSameType daughterRegions = null; @@ -1427,8 +1583,8 @@ public class TestSplitTransactionOnCluster { // Puts for daughters Put putA = MetaEditor.makePutFromRegionInfo(daughterRegions.getFirst().getRegionInfo()); Put putB = MetaEditor.makePutFromRegionInfo(daughterRegions.getSecond().getRegionInfo()); - st.addLocation(putA, rs.getServerName(), 1); - st.addLocation(putB, rs.getServerName(), 1); + st.addLocation(putA, rs.getServerName(), 1, 0); + st.addLocation(putB, rs.getServerName(), 1, 0); metaEntries.add(putA); metaEntries.add(putB); }