diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java index 01ea8c1..c0c2789 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java @@ -186,7 +186,7 @@ public class MetaReader { * @return An {@link HTable} for hbase:meta * @throws IOException */ - static HTable getMetaHTable(final CatalogTracker ct) + public static HTable getMetaHTable(final CatalogTracker ct) throws IOException { return getHTable(ct, TableName.META_TABLE_NAME); } @@ -265,7 +265,7 @@ public class MetaReader { } /** Returns the row key to use for this regionInfo */ - protected static byte[] getMetaKeyForRegion(HRegionInfo regionInfo) { + public static byte[] getMetaKeyForRegion(HRegionInfo regionInfo) { return RegionReplicaUtil.getRegionInfoForDefaultReplica(regionInfo).getRegionName(); } @@ -612,7 +612,7 @@ public class MetaReader { * Returns the column family used for meta columns. * @return HConstants.CATALOG_FAMILY. */ - protected static byte[] getFamily() { + public static byte[] getFamily() { return HConstants.CATALOG_FAMILY; } @@ -664,6 +664,19 @@ public class MetaReader { } /** + * Returns the column qualifier for daughter column for replicaId + * @param replicaId the replicaId of the region + * @return a byte[] for daughter column qualifier + */ + @VisibleForTesting + public static byte[] getDaughterReplicaQualifier(int replicaId) { + return replicaId == 0 + ? HConstants.DAUGHTER_QUALIFIER + : Bytes.toBytes(HConstants.DAUGHTER_QUALIFIER_STR + META_REPLICA_ID_DELIMITER + + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId)); + } + + /** * Parses the replicaId from the server column qualifier. See top of the class javadoc * for the actual meta layout * @param serverColumn the column qualifier @@ -744,8 +757,13 @@ public class MetaReader { if (replicaId < 0) { break; } + byte[] daughter = getDaughterReplicaQualifier(replicaId); + HRegionInfo h = getHRegionInfo(r, daughter); + if (h == null) { + h = regionInfo; + } - locations.add(getRegionLocation(r, regionInfo, replicaId)); + locations.add(getRegionLocation(r, h, replicaId)); } return new RegionLocations(locations); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 0712469..00f582d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -401,6 +401,11 @@ public final class HConstants { /** The open seqnum column qualifier */ public static final byte [] SEQNUM_QUALIFIER = Bytes.toBytes(SEQNUM_QUALIFIER_STR); + /** The daughter qualifier */ + public static final String DAUGHTER_QUALIFIER_STR = "daughter"; + /** The daughter qualifier */ + public static final byte [] DAUGHTER_QUALIFIER = Bytes.toBytes(DAUGHTER_QUALIFIER_STR); + /** The lower-half split region column qualifier */ public static final byte [] SPLITA_QUALIFIER = Bytes.toBytes("splitA"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index e236d99..77270f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -30,12 +30,15 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; @@ -383,6 +386,9 @@ public class MetaEditor extends MetaReader { HRegionInfo copyOfParent = new HRegionInfo(parent); copyOfParent.setOffline(true); copyOfParent.setSplit(true); + byte[] key = MetaReader.getMetaKeyForRegion(parent); + Result result = meta.get(new Get(key)); + RegionLocations rl = MetaReader.getRegionLocations(result); //Put for parent Put putParent = makePutFromRegionInfo(copyOfParent); @@ -394,6 +400,21 @@ public class MetaEditor extends MetaReader { addLocation(putA, sn, 1, splitA.getReplicaId()); //new regions, openSeqNum = 1 is fine. addLocation(putB, sn, 1, splitB.getReplicaId()); + // set the replicas to point to the locations of the old replicas + for (int i = 1; i < rl.size(); i++) { + ServerName s; + byte[] parentHri; + if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica + continue; + } else { + s = rl.getRegionLocation(i).getServerName(); + parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray(); + } + addLocation(putA, s, rl.getRegionLocation(i).getSeqNum(), i); + addLocation(putB, s, rl.getRegionLocation(i).getSeqNum(), i); + putA.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + putB.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + } byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER); multiMutate(meta, tableRow, putParent, putA, putB); @@ -463,9 +484,20 @@ public class MetaEditor extends MetaReader { HRegionInfo regionInfo, ServerName sn, long openSeqNum) throws IOException { // region replicas are kept in the primary region's row - Put put = new Put(getMetaKeyForRegion(regionInfo)); + byte[] metaRow = getMetaKeyForRegion(regionInfo); + Put put = new Put(metaRow); addLocation(put, sn, openSeqNum, regionInfo.getReplicaId()); - putToCatalogTable(catalogTracker, put); + put.addImmutable(HConstants.CATALOG_FAMILY, + MetaReader.getServerColumn(regionInfo.getReplicaId()), + Bytes.toBytes(sn.getHostAndPort())); + if (regionInfo.getReplicaId() != 0) { + Delete d = new Delete(metaRow); + d.deleteColumn(HConstants.CATALOG_FAMILY, + MetaReader.getDaughterReplicaQualifier(regionInfo.getReplicaId())); + multiMutate(MetaReader.getMetaHTable(catalogTracker), metaRow, put, d); + } else { + putToCatalogTable(catalogTracker, put); + } LOG.info("Updated row " + regionInfo.getRegionNameAsString() + " with server=" + sn); } @@ -573,7 +605,7 @@ public class MetaEditor extends MetaReader { return p; } - private static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){ + public static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){ p.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getServerColumn(replicaId), Bytes.toBytes(sn.getHostAndPort())); p.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getStartCodeColumn(replicaId), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index a7c3f69..99e3d2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -3555,6 +3556,13 @@ public class AssignmentManager extends ZooKeeperListener { } if (et == EventType.RS_ZK_REGION_SPLIT) { + // split replicas + try { + doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b, + ((MasterServices)server).getTableDescriptors().get(p.getTable()).getRegionReplication()); + } catch (IOException e) { + LOG.warn("Failed to handle splits for replica regions " + e); + } LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node"); // Remove region from ZK try { @@ -3586,6 +3594,47 @@ public class AssignmentManager extends ZooKeeperListener { return true; } + private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, + final HRegionInfo hri_b, final int numReplicas) { + // create new regions for the replica, and assign them to match with the + // current replica assignments. If replica1 of parent is assigned to RS1, + // the replica1s of daughters will be on the same machine + Map map = new HashMap(); + for (int i = 1; i < numReplicas; i++) { + prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map); + prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map); + } + try { + assign(map); + } catch (IOException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } catch (InterruptedException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } + // unassign the old replicas + for (int i = 1; i < numReplicas; i++) { + HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(parentHri, i); + LOG.debug("Unassigning replica for split parent " + h); + unassign(RegionReplicaUtil.getRegionInfoForReplica(parentHri, i)); + } + } + + private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, + int replicaId, Map map) { + HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId); + HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri, + replicaId); + LOG.debug("Created replica region for daughter " + daughterReplica); + ServerName sn; + if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) { + map.put(daughterReplica, sn); + } else { + List servers = serverManager.getOnlineServersList(); + sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size())); + map.put(daughterReplica, sn); + } + } + /** * A region is offline. The new state should be the specified one, * if not null. If the specified state is null, the new state is Offline. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 59bc01e..0207029 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -23,7 +23,11 @@ import java.io.IOException; import java.util.Comparator; import java.util.HashSet; import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.util.Bytes; @@ -110,12 +115,13 @@ public class CatalogJanitor extends Chore { /** * Scans hbase:meta and returns a number of scanned rows, and a map of merged * regions, and an ordered map of split parents. - * @return triple of scanned rows, map of merged regions and map of split - * parent regioninfos + * @return triple of scanned rows, map of merged regions and a pair consisting of + * map of split parent regioninfos and the set of parents to which daughters still have + * back references to [TODO: the signature is ugly; fix it] * @throws IOException */ - Triple, Map> getMergedRegionsAndSplitParents() - throws IOException { + Triple, Pair, Set>> + getMergedRegionsAndSplitParents() throws IOException { return getMergedRegionsAndSplitParents(null); } @@ -125,11 +131,13 @@ public class CatalogJanitor extends Chore { * null, return merged regions and split parents of all tables, else only the * specified table * @param tableName null represents all tables - * @return triple of scanned rows, and map of merged regions, and map of split - * parent regioninfos + * @return triple of scanned rows, and map of merged regions, and a pair consisting of + * map of split parent regioninfos and the set of parents to which daughters still have + * back references to [TODO: the signature is ugly; fix it] * @throws IOException */ - Triple, Map> getMergedRegionsAndSplitParents( + Triple, Pair, Set>> + getMergedRegionsAndSplitParents( final TableName tableName) throws IOException { final boolean isTableSpecified = (tableName != null); // TODO: Only works with single hbase:meta region currently. Fix. @@ -139,6 +147,7 @@ public class CatalogJanitor extends Chore { final Map splitParents = new TreeMap(new SplitParentFirstComparator()); final Map mergedRegions = new TreeMap(); + final Set parentsReferenced = new HashSet(); // This visitor collects split parents and counts rows in the hbase:meta table MetaScannerVisitor visitor = new MetaScanner.MetaScannerVisitorBase() { @@ -153,6 +162,18 @@ public class CatalogJanitor extends Chore { // Another table, stop scanning return false; } + int replicaId = 0; + NavigableMap> familyMap = r.getNoVersionMap(); + NavigableMap infoMap = familyMap.get(MetaReader.getFamily()); + byte[] daughterColumn = MetaReader.getDaughterReplicaQualifier(replicaId); + SortedMap serverMap = infoMap.tailMap(daughterColumn, false); + if (!serverMap.isEmpty()) { + for (Entry entry : serverMap.entrySet()) { + HRegionInfo h = HRegionInfo.parseFromOrNull(entry.getValue()); + // the parent row should not be deleted + parentsReferenced.add(RegionReplicaUtil.getRegionInfoForDefaultReplica(h)); + } + } if (info.isSplitParent()) splitParents.put(info, r); if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) { mergedRegions.put(info, r); @@ -166,8 +187,9 @@ public class CatalogJanitor extends Chore { // the start row MetaScanner.metaScan(server.getConfiguration(), null, visitor, tableName); - return new Triple, Map>( - count.get(), mergedRegions, splitParents); + return new Triple, Pair, + Set>>(count.get(), mergedRegions, new Pair, + Set>(splitParents, parentsReferenced)); } /** @@ -216,8 +238,8 @@ public class CatalogJanitor extends Chore { if (!alreadyRunning.compareAndSet(false, true)) { return 0; } - Triple, Map> scanTriple = - getMergedRegionsAndSplitParents(); + Triple, Pair, Set>> + scanTriple = getMergedRegionsAndSplitParents(); int count = scanTriple.getFirst(); /** * clean merge regions first @@ -244,13 +266,17 @@ public class CatalogJanitor extends Chore { /** * clean split parents */ - Map splitParents = scanTriple.getThird(); + Map splitParents = scanTriple.getThird().getFirst(); + // Get the parents that are referenced from the daughter replicas, and don't + // delete the corresponding rows + Set parentsReferenced = scanTriple.getThird().getSecond(); // Now work on our list of found parents. See if any we can clean up. int splitCleaned = 0; // regions whose parents are still around HashSet parentNotCleaned = new HashSet(); for (Map.Entry e : splitParents.entrySet()) { + if (parentsReferenced != null && parentsReferenced.contains(e.getKey())) continue; if (!parentNotCleaned.contains(e.getKey().getEncodedName()) && cleanParent(e.getKey(), e.getValue())) { splitCleaned++; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 3394ccd..f9612ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -41,13 +41,18 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -588,30 +593,44 @@ public class SplitTransaction { copyOfParent.setOffline(true); copyOfParent.setSplit(true); + HTable meta = MetaReader.getMetaHTable(catalogTracker); //Put for parent Put putParent = MetaEditor.makePutFromRegionInfo(copyOfParent); MetaEditor.addDaughtersToPut(putParent, splitA, splitB); mutations.add(putParent); + byte[] key = MetaReader.getMetaKeyForRegion(parent); + Result result = meta.get(new Get(key)); + RegionLocations rl = MetaReader.getRegionLocations(result); //Puts for daughters Put putA = MetaEditor.makePutFromRegionInfo(splitA); Put putB = MetaEditor.makePutFromRegionInfo(splitB); - addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine. - addLocation(putB, serverName, 1); + //these are new regions, openSeqNum = 1 is fine. + addLocation(putA, serverName, 1, 0); + addLocation(putB, serverName, 1, 0); + // set the replicas to point to the locations of the old replicas + for (int i = 1; i < rl.size(); i++) { + ServerName s; + byte[] parentHri; + if (rl.getRegionLocation(i) == null) { // if null then don't know anything about replica + continue; + } else { + s = rl.getRegionLocation(i).getServerName(); + parentHri = rl.getRegionLocation(i).getRegionInfo().toByteArray(); + } + addLocation(putA, s, rl.getRegionLocation(i).getSeqNum(), i); + addLocation(putB, s, rl.getRegionLocation(i).getSeqNum(), i); + putA.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + putB.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getDaughterReplicaQualifier(i), parentHri); + } mutations.add(putA); mutations.add(putB); MetaEditor.mutateMetaTable(catalogTracker, mutations); } - public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, - Bytes.toBytes(sn.getHostAndPort())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, - Bytes.toBytes(sn.getStartcode())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, - Bytes.toBytes(openSeqNum)); - return p; + public Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId) { + return MetaEditor.addLocation(p, sn, openSeqNum, replicaId); } /* diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 96bcdf5..104f635 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -90,7 +90,7 @@ public class TestReplicasClient { static final AtomicLong sleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); - static final AtomicReference cdl = + private static final AtomicReference cdl = new AtomicReference(new CountDownLatch(0)); public SlowMeCopro() { @@ -126,7 +126,7 @@ public class TestReplicasClient { private void slowdownCode(final ObserverContext e) { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { - CountDownLatch latch = cdl.get(); + CountDownLatch latch = getCdl().get(); try { if (sleepTime.get() > 0) { LOG.info("Sleeping for " + sleepTime.get() + " ms"); @@ -145,6 +145,10 @@ public class TestReplicasClient { LOG.info("We're not the primary replicas."); } } + + public static AtomicReference getCdl() { + return cdl; + } } @BeforeClass @@ -283,7 +287,7 @@ public class TestReplicasClient { public void testUseRegionWithoutReplica() throws Exception { byte[] b1 = "testUseRegionWithoutReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(0)); + SlowMeCopro.getCdl().set(new CountDownLatch(0)); try { Get g = new Get(b1); Result r = table.get(g); @@ -339,14 +343,14 @@ public class TestReplicasClient { byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); try { Get g = new Get(b1); g.setConsistency(Consistency.TIMELINE); Result r = table.get(g); Assert.assertTrue(r.isStale()); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); closeRegion(hriSecondary); } } @@ -455,13 +459,13 @@ public class TestReplicasClient { LOG.info("sleep and is not stale done"); // But if we ask for stale we will get it - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); @@ -474,14 +478,14 @@ public class TestReplicasClient { LOG.info("exists not stale done"); // exists works on stale but don't see the put - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse("The secondary has stale data", r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale before flush done"); HTU.getHBaseAdmin().flush(table.getTableName()); @@ -489,28 +493,28 @@ public class TestReplicasClient { Thread.sleep(1000 + REFRESH_PERIOD * 2); // get works and is not stale - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse(r.isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); // exists works on stale and we see the put after the flush - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale after flush done"); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); Delete d = new Delete(b1); table.delete(d); @@ -576,7 +580,7 @@ public class TestReplicasClient { SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 8f0ae98..66b7fa2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Triple; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; @@ -613,16 +615,20 @@ public class TestCatalogJanitor { new byte[0]); Thread.sleep(1001); + final Pair, Set> pair = + new Pair, Set>(); final Map splitParents = new TreeMap(new SplitParentFirstComparator()); splitParents.put(parent, createResult(parent, splita, splitb)); splita.setOffline(true); //simulate that splita goes offline when it is split splitParents.put(splita, createResult(splita, splitaa,splitab)); + pair.setFirst(splitParents); + pair.setSecond(null); final Map mergedRegions = new TreeMap(); CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); - doReturn(new Triple, Map>( - 10, mergedRegions, splitParents)).when(janitor) + doReturn(new Triple, Pair, Set>>( + 10, mergedRegions, pair)).when(janitor) .getMergedRegionsAndSplitParents(); //create ref from splita to parent diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 59bbd1f..018b117 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -55,9 +55,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -836,6 +840,82 @@ public class TestSplitTransactionOnCluster { } } + @Test + public void testSplitWithRegionReplicas() throws Exception { + ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL); + final TableName tableName = + TableName.valueOf("foobar"); + HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar"); + htd.setRegionReplication(2); + htd.addCoprocessor(SlowMeCopro.class.getName()); + // Create table then get the single region for our new table. + HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")}, TESTING_UTIL.getConfiguration()); + TESTING_UTIL.waitUntilAllRegionsAssigned(tableName); + List regions = null; + try { + regions = cluster.getRegions(tableName); + int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName()); + HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); + insertData(tableName.getName(), admin, t); + // Turn off balancer so it doesn't cut in and mess up our placements. + admin.setBalancerRunning(false, true); + // Turn off the meta scanner so it don't remove parent on us. + cluster.getMaster().setCatalogJanitorEnabled(false); + boolean tableExists = MetaReader.tableExists(regionServer.getCatalogTracker(), + tableName); + assertEquals("The specified table should present.", true, tableExists); + final HRegion region = findSplittableRegion(regions); + regionServerIndex = cluster.getServerWith(region.getRegionName()); + regionServer = cluster.getRegionServer(regionServerIndex); + assertTrue("not able to find a splittable region", region != null); + String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), + region.getRegionInfo().getEncodedName()); + regionServer.getZooKeeper().sync(node); + LOG.debug("CURRENT REGION COUNT " + TESTING_UTIL.getHBaseAdmin().getClusterStatus().getRegionsCount()); + SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + try { + st.prepare(); + st.execute(regionServer, regionServer); + } catch (IOException e) { + e.printStackTrace(); + fail("Split execution should have succeeded with no exceptions thrown " + e); + } + TESTING_UTIL.waitUntilAllRegionsAssigned(tableName); + LOG.debug("CURRENT REGION COUNT " + TESTING_UTIL.getHBaseAdmin().getClusterStatus().getRegionsCount()); + tableExists = MetaReader.tableExists(regionServer.getCatalogTracker(), + tableName); + assertEquals("The specified table should present.", true, tableExists); + // exists works on stale and we see the put after the flush + byte[] b1 = "row1".getBytes(); + Get g = new Get(b1); + g.setConsistency(Consistency.STRONG); + // The following GET will make a trip to the meta to get the new location of the 1st daughter + // In the process it will also get the location of the replica of the daughter (initially + // pointing to the parent's replica) + Result r = t.get(g); + Assert.assertFalse(r.isStale()); + LOG.info("exists stale after flush done"); + MetaReader.fullScanMetaAndPrint(new CatalogTracker(TESTING_UTIL.getConfiguration())); + + SlowMeCopro.getCdl().set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + // This will succeed because in the previous GET we get the location of the replica + r = t.get(g); + Assert.assertTrue(r.isStale()); + SlowMeCopro.getCdl().get().countDown(); + } finally { + if (regions != null) { + String node = ZKAssign.getNodeName(zkw, regions.get(0).getRegionInfo() + .getEncodedName()); + ZKUtil.deleteNodeFailSilent(zkw, node); + } + admin.setBalancerRunning(true, false); + cluster.getMaster().setCatalogJanitorEnabled(true); + t.close(); + } + } + private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException, InterruptedException { Put p = new Put(Bytes.toBytes("row1")); @@ -1083,7 +1163,7 @@ public class TestSplitTransactionOnCluster { private HRegion findSplittableRegion(final List regions) throws InterruptedException { for (int i = 0; i < 5; ++i) { for (HRegion r: regions) { - if (r.isSplittable()) { + if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) { return(r); } } @@ -1307,8 +1387,8 @@ public class TestSplitTransactionOnCluster { // Puts for daughters Put putA = MetaEditor.makePutFromRegionInfo(daughterRegions.getFirst().getRegionInfo()); Put putB = MetaEditor.makePutFromRegionInfo(daughterRegions.getSecond().getRegionInfo()); - st.addLocation(putA, rs.getServerName(), 1); - st.addLocation(putB, rs.getServerName(), 1); + st.addLocation(putA, rs.getServerName(), 1, 0); + st.addLocation(putB, rs.getServerName(), 1, 0); metaEntries.add(putA); metaEntries.add(putB); }