diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 2888e1e..11b9ec2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -30,8 +30,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadFactory; @@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.Result; @@ -148,6 +151,8 @@ public class AssignmentManager extends ZooKeeperListener { final private KeyLocker locker = new KeyLocker(); + Set replicasToClose = Collections.synchronizedSet(new HashSet()); + /** * Map of regions to reopen after the schema of a table is changed. Key - * encoded region name, value - HRegionInfo @@ -616,6 +621,12 @@ public class AssignmentManager extends ZooKeeperListener { LOG.info("Clean cluster startup. Assigning user regions"); assignAllUserRegions(allRegions); } + // unassign replicas of the split parents and the merged regions + // the daughter replicas are opened in assignAllUserRegions if it was + // not already opened. + for (HRegionInfo h : replicasToClose) { + unassign(h); + } return failover; } @@ -798,7 +809,11 @@ public class AssignmentManager extends ZooKeeperListener { case RS_ZK_REGION_FAILED_OPEN: // Region is closed, insert into RIT and handle it regionStates.updateRegionState(regionInfo, State.CLOSED, sn); - invokeAssign(regionInfo); + if (!replicasToClose.contains(regionInfo)) { + invokeAssign(regionInfo); + } else { + offlineDisabledRegion(regionInfo); + } break; case M_ZK_REGION_OFFLINE: @@ -2207,7 +2222,7 @@ public class AssignmentManager extends ZooKeeperListener { private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { if (this.tableStateManager.isTableState(region.getTable(), ZooKeeperProtos.Table.State.DISABLED, - ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) { LOG.info("Table " + region.getTable() + " is disabled or disabling;" + " skipping assign of " + region.getRegionNameAsString()); offlineDisabledRegion(region); @@ -2461,7 +2476,7 @@ public class AssignmentManager extends ZooKeeperListener { lock.unlock(); // Region is expected to be reassigned afterwards - if (reassign && regionStates.isRegionOffline(region)) { + if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) { assign(region, true); } } @@ -2768,6 +2783,19 @@ public class AssignmentManager extends ZooKeeperListener { LOG.debug("null result from meta - ignoring but this is strange."); continue; } + // keep a track of replicas to close. These were the replicas of the originally + // unmerged regions. The master might have closed them before but it mightn't + // maybe because it crashed. + PairOfSameType p = MetaTableAccessor.getMergeRegions(result); + if (p.getFirst() != null && p.getSecond() != null) { + int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst(). + getTable()).getRegionReplication(); + for (HRegionInfo merge : p) { + for (int i = 1; i < numReplicas; i++) { + replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i)); + } + } + } RegionLocations rl = MetaTableAccessor.getRegionLocations(result); if (rl == null) continue; HRegionLocation[] locations = rl.getRegionLocations(); @@ -2777,6 +2805,14 @@ public class AssignmentManager extends ZooKeeperListener { if (regionInfo == null) continue; int replicaId = regionInfo.getReplicaId(); State state = RegionStateStore.getRegionState(result, replicaId); + // keep a track of replicas to close. These were the replicas of the split parents + // from the previous life of the master. The master should have closed them before + // but it couldn't maybe because it crashed + if (replicaId == 0 && state.equals(State.SPLIT)) { + for (HRegionLocation h : locations) { + replicasToClose.add(h.getRegionInfo()); + } + } ServerName lastHost = hrl.getServerName(); ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId); regionStates.createRegionState(regionInfo, state, regionLocation, lastHost); @@ -3380,7 +3416,8 @@ public class AssignmentManager extends ZooKeeperListener { // When there are more than one region server a new RS is selected as the // destination and the same is updated in the region plan. (HBASE-5546) if (getTableStateManager().isTableState(hri.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + replicasToClose.contains(hri)) { offlineDisabledRegion(hri); return; } @@ -3420,7 +3457,8 @@ public class AssignmentManager extends ZooKeeperListener { private void onRegionClosed(final HRegionInfo hri) { if (getTableStateManager().isTableState(hri.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + replicasToClose.contains(hri)) { offlineDisabledRegion(hri); return; } @@ -3432,8 +3470,8 @@ public class AssignmentManager extends ZooKeeperListener { } private String onRegionSplit(ServerName sn, TransitionCode code, - HRegionInfo p, HRegionInfo a, HRegionInfo b) { - RegionState rs_p = regionStates.getRegionState(p); + final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { + final RegionState rs_p = regionStates.getRegionState(p); RegionState rs_a = regionStates.getRegionState(a); RegionState rs_b = regionStates.getRegionState(b); if (!(rs_p.isOpenOrSplittingOnServer(sn) @@ -3459,6 +3497,15 @@ public class AssignmentManager extends ZooKeeperListener { ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { invokeUnAssign(a); invokeUnAssign(b); + } else { + Callable splitReplicasCallable = new Callable() { + @Override + public Object call() { + doSplittingOfReplicas(p, a, b); + return null; + } + }; + threadPoolExecutorService.submit(splitReplicasCallable); } } else if (code == TransitionCode.SPLIT_PONR) { try { @@ -3481,7 +3528,7 @@ public class AssignmentManager extends ZooKeeperListener { } private String onRegionMerge(ServerName sn, TransitionCode code, - HRegionInfo p, HRegionInfo a, HRegionInfo b) { + final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { RegionState rs_p = regionStates.getRegionState(p); RegionState rs_a = regionStates.getRegionState(a); RegionState rs_b = regionStates.getRegionState(b); @@ -3508,6 +3555,15 @@ public class AssignmentManager extends ZooKeeperListener { if (getTableStateManager().isTableState(p.getTable(), ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { invokeUnAssign(p); + } else { + Callable mergeReplicasCallable = new Callable() { + @Override + public Object call() { + doMergingOfReplicas(p, a, b); + return null; + } + }; + threadPoolExecutorService.submit(mergeReplicasCallable); } } else if (code == TransitionCode.MERGE_PONR) { try { @@ -3614,6 +3670,7 @@ public class AssignmentManager extends ZooKeeperListener { } if (et == EventType.RS_ZK_REGION_MERGED) { + doMergingOfReplicas(p, hri_a, hri_b); LOG.debug("Handling MERGED event for " + encodedName + "; deleting node"); // Remove region from ZK try { @@ -3740,6 +3797,8 @@ public class AssignmentManager extends ZooKeeperListener { } if (et == EventType.RS_ZK_REGION_SPLIT) { + // split replicas + doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b); LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node"); // Remove region from ZK try { @@ -3772,6 +3831,110 @@ public class AssignmentManager extends ZooKeeperListener { return true; } + private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a, + final HRegionInfo hri_b) { + // Close replicas for the original unmerged regions. create/assign new replicas + // for the merged parent. + List unmergedRegions = new ArrayList(); + unmergedRegions.add(hri_a); + unmergedRegions.add(hri_b); + Map> map = regionStates.getRegionAssignments(unmergedRegions); + Collection> c = map.values(); + for (List l : c) { + for (HRegionInfo h : l) { + if (!RegionReplicaUtil.isDefaultReplica(h)) { + LOG.debug("Unassigning un-merged replica " + h); + unassign(h); + } + } + } + int numReplicas = 1; + try { + numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()). + getRegionReplication(); + } catch (IOException e) { + LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() + + " due to " + e.getMessage() + ". The assignment of replicas for the merged region " + + "will not be done"); + } + List regions = new ArrayList(); + for (int i = 1; i < numReplicas; i++) { + regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i)); + } + try { + assign(regions); + } catch (IOException ioe) { + LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " + + ioe.getMessage()); + } catch (InterruptedException ie) { + LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " + + ie.getMessage()); + } + } + + private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, + final HRegionInfo hri_b) { + // create new regions for the replica, and assign them to match with the + // current replica assignments. If replica1 of parent is assigned to RS1, + // the replica1s of daughters will be on the same machine + int numReplicas = 1; + try { + numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()). + getRegionReplication(); + } catch (IOException e) { + LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() + + " due to " + e.getMessage() + ". The assignment of daughter replicas " + + "replicas will not be done"); + } + // unassign the old replicas + List parentRegion = new ArrayList(); + parentRegion.add(parentHri); + Map> currentAssign = + regionStates.getRegionAssignments(parentRegion); + Collection> c = currentAssign.values(); + for (List l : c) { + for (HRegionInfo h : l) { + if (!RegionReplicaUtil.isDefaultReplica(h)) { + LOG.debug("Unassigning parent's replica " + h); + unassign(h); + } + } + } + // assign daughter replicas + Map map = new HashMap(); + for (int i = 1; i < numReplicas; i++) { + prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map); + prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map); + } + try { + assign(map); + } catch (IOException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } catch (InterruptedException e) { + LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); + } + } + + private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, + int replicaId, Map map) { + HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId); + HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri, + replicaId); + LOG.debug("Created replica region for daughter " + daughterReplica); + ServerName sn; + if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) { + map.put(daughterReplica, sn); + } else { + List servers = serverManager.getOnlineServersList(); + sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size())); + map.put(daughterReplica, sn); + } + } + + public Set getReplicasToClose() { + return replicasToClose; + } + /** * A region is offline. The new state should be the specified one, * if not null. If the specified state is null, the new state is Offline. @@ -3786,6 +3949,25 @@ public class AssignmentManager extends ZooKeeperListener { // Tell our listeners that a region was closed sendRegionClosedNotification(regionInfo); + // also note that all the replicas of the primary should be closed + if (state != null && state.equals(State.SPLIT)) { + Collection c = new ArrayList(1); + c.add(regionInfo); + Map> map = regionStates.getRegionAssignments(c); + Collection> allReplicas = map.values(); + for (List list : allReplicas) { + replicasToClose.addAll(list); + } + } + else if (state != null && state.equals(State.MERGED)) { + Collection c = new ArrayList(1); + c.add(regionInfo); + Map> map = regionStates.getRegionAssignments(c); + Collection> allReplicas = map.values(); + for (List list : allReplicas) { + replicasToClose.addAll(list); + } + } } private void sendRegionOpenedNotification(final HRegionInfo regionInfo, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java index 3bb5220..b01434e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java @@ -93,7 +93,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName()); // Check if this table is being disabled or not if (this.assignmentManager.getTableStateManager().isTableState(this.regionInfo.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + assignmentManager.getReplicasToClose().contains(regionInfo)) { assignmentManager.offlineDisabledRegion(regionInfo); return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 607d042..f6d798a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -266,7 +266,8 @@ public class ServerShutdownHandler extends EventHandler { } else if (rit != null) { if (rit.isPendingCloseOrClosing() && am.getTableStateManager().isTableState(hri.getTable(), - ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { + ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || + am.getReplicasToClose().contains(hri)) { // If the table was partially disabled and the RS went down, we should clear the RIT // and remove the node for the region. // The rit that we use may be stale in case the table was in DISABLING state diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 7975c51..618b03c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -149,6 +149,23 @@ public class StoreFileInfo implements Comparable { } /** + * Create a Store File Info from an HFileLink + * @param conf + * @param fs + * @param fileStatus + * @param link + * @throws IOException + */ + public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus, + final Reference reference) + throws IOException { + this.conf = conf; + this.fileStatus = fileStatus; + this.reference = reference; + this.link = null; + } + + /** * Sets the region coprocessor env. * @param coprocessorHost */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 237e316..74a74a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; @@ -83,6 +84,11 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { return new StoreFileInfo(conf, fs, status); } + if (StoreFileInfo.isReference(status.getPath())) { + Reference reference = Reference.read(fs, status.getPath()); + return new StoreFileInfo(conf, fs, status, reference); + } + // else create a store file link. The link file does not exists on filesystem though. HFileLink link = new HFileLink(conf, HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 1f1f3d4..bc3a1f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -92,7 +92,7 @@ public class TestReplicasClient { static final AtomicLong sleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); - static final AtomicReference cdl = + private static final AtomicReference cdl = new AtomicReference(new CountDownLatch(0)); Random r = new Random(); public SlowMeCopro() { @@ -129,7 +129,7 @@ public class TestReplicasClient { private void slowdownCode(final ObserverContext e) { if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { - CountDownLatch latch = cdl.get(); + CountDownLatch latch = getCdl().get(); try { if (sleepTime.get() > 0) { LOG.info("Sleeping for " + sleepTime.get() + " ms"); @@ -148,6 +148,10 @@ public class TestReplicasClient { LOG.info("We're not the primary replicas."); } } + + public static AtomicReference getCdl() { + return cdl; + } } @BeforeClass @@ -291,7 +295,7 @@ public class TestReplicasClient { public void testUseRegionWithoutReplica() throws Exception { byte[] b1 = "testUseRegionWithoutReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(0)); + SlowMeCopro.getCdl().set(new CountDownLatch(0)); try { Get g = new Get(b1); Result r = table.get(g); @@ -347,14 +351,14 @@ public class TestReplicasClient { byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); openRegion(hriSecondary); - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); try { Get g = new Get(b1); g.setConsistency(Consistency.TIMELINE); Result r = table.get(g); Assert.assertTrue(r.isStale()); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); closeRegion(hriSecondary); } } @@ -465,13 +469,13 @@ public class TestReplicasClient { LOG.info("sleep and is not stale done"); // But if we ask for stale we will get it - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); @@ -484,14 +488,14 @@ public class TestReplicasClient { LOG.info("exists not stale done"); // exists works on stale but don't see the put - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse("The secondary has stale data", r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale before flush done"); flushRegion(hriPrimary); @@ -500,28 +504,28 @@ public class TestReplicasClient { Thread.sleep(1000 + REFRESH_PERIOD * 2); // get works and is not stale - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertFalse(r.isEmpty()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("stale done"); // exists works on stale and we see the put after the flush - SlowMeCopro.cdl.set(new CountDownLatch(1)); + SlowMeCopro.getCdl().set(new CountDownLatch(1)); g = new Get(b1); g.setCheckExistenceOnly(true); g.setConsistency(Consistency.TIMELINE); r = table.get(g); Assert.assertTrue(r.isStale()); Assert.assertTrue(r.getExists()); - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); LOG.info("exists stale after flush done"); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); Delete d = new Delete(b1); table.delete(d); @@ -587,7 +591,7 @@ public class TestReplicasClient { SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); } finally { - SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.getCdl().get().countDown(); SlowMeCopro.sleepTime.set(0); SlowMeCopro.slowDownNext.set(false); SlowMeCopro.countOfNext.set(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index 6981415..0b811ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.math.RandomUtils; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -287,6 +289,45 @@ public class TestRegionMergeTransactionOnCluster { } } + @Test + public void testMergeWithReplicas() throws Exception { + final TableName tableName = TableName.valueOf("testMergeWithReplicas"); + // Create table and load data. + createTableAndLoadData(master, tableName, 5, 2); + List> initialRegionToServers = + MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(), + master.getShortCircuitConnection(), tableName); + // Merge 1st and 2nd region + PairOfSameType mergedRegions = mergeRegionsAndVerifyRegionNum(master, tableName, + 0, 2, 5 * 2 - 2); + List> currentRegionToServers = + MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(), + master.getShortCircuitConnection(), tableName); + List initialRegions = new ArrayList(); + for (Pair p : initialRegionToServers) { + initialRegions.add(p.getFirst()); + } + List currentRegions = new ArrayList(); + for (Pair p : currentRegionToServers) { + currentRegions.add(p.getFirst()); + } + assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is the first region + assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getFirst(), 1))); //this is the replica of the first region + assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is the second region + assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getSecond(), 1))); //this is the replica of the second region + assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the new region + assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + currentRegions.get(0), 1))); //replica of the new region + assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + currentRegions.get(0), 1))); //replica of the new region + assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getFirst(), 1))); //replica of the merged region + assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica( + mergedRegions.getSecond(), 1))); //replica of the merged region + } + private PairOfSameType mergeRegionsAndVerifyRegionNum( HMaster master, TableName tablename, int regionAnum, int regionBnum, int expectedRegionNum) throws Exception { @@ -335,11 +376,11 @@ public class TestRegionMergeTransactionOnCluster { private HTable createTableAndLoadData(HMaster master, TableName tablename) throws Exception { - return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM); + return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1); } private HTable createTableAndLoadData(HMaster master, TableName tablename, - int numRegions) throws Exception { + int numRegions, int replication) throws Exception { assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions); byte[][] splitRows = new byte[numRegions - 1][]; for (int i = 0; i < splitRows.length; i++) { @@ -347,6 +388,9 @@ public class TestRegionMergeTransactionOnCluster { } HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows); + if (replication > 1) { + HBaseTestingUtility.setReplicas(admin, tablename, replication); + } loadData(table); verifyRowCount(table, ROWSIZE); @@ -356,7 +400,7 @@ public class TestRegionMergeTransactionOnCluster { while (System.currentTimeMillis() < timeout) { tableRegions = MetaTableAccessor.getTableRegionsAndLocations( master.getZooKeeper(), master.getShortCircuitConnection(), tablename); - if (tableRegions.size() == numRegions) + if (tableRegions.size() == numRegions * replication) break; Thread.sleep(250); } @@ -364,7 +408,7 @@ public class TestRegionMergeTransactionOnCluster { tableRegions = MetaTableAccessor.getTableRegionsAndLocations( master.getZooKeeper(), master.getShortCircuitConnection(), tablename); LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions)); - assertEquals(numRegions, tableRegions.size()); + assertEquals(numRegions * replication, tableRegions.size()); return table; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index b7626e9..3f03307 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -57,7 +57,9 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro; import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination; import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; @@ -924,6 +927,84 @@ public class TestSplitTransactionOnCluster { } } + @Test + public void testSplitWithRegionReplicas() throws Exception { + ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL); + final TableName tableName = + TableName.valueOf("foobar"); + HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar"); + htd.setRegionReplication(2); + htd.addCoprocessor(SlowMeCopro.class.getName()); + // Create table then get the single region for our new table. + HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")}, + TESTING_UTIL.getConfiguration()); + int count; + do { + count = cluster.getRegions(tableName).size(); + } while (count != 2); + List regions = null; + try { + regions = cluster.getRegions(tableName); + int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName()); + HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); + insertData(tableName.getName(), admin, t); + // Turn off balancer so it doesn't cut in and mess up our placements. + admin.setBalancerRunning(false, true); + // Turn off the meta scanner so it don't remove parent on us. + cluster.getMaster().setCatalogJanitorEnabled(false); + boolean tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(), + tableName); + assertEquals("The specified table should be present.", true, tableExists); + final HRegion region = findSplittableRegion(regions); + regionServerIndex = cluster.getServerWith(region.getRegionName()); + regionServer = cluster.getRegionServer(regionServerIndex); + assertTrue("not able to find a splittable region", region != null); + String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), + region.getRegionInfo().getEncodedName()); + regionServer.getZooKeeper().sync(node); + SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + try { + st.prepare(); + st.execute(regionServer, regionServer); + } catch (IOException e) { + e.printStackTrace(); + fail("Split execution should have succeeded with no exceptions thrown " + e); + } + TESTING_UTIL.waitUntilAllRegionsAssigned(tableName); + tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(), + tableName); + assertEquals("The specified table should be present.", true, tableExists); + // exists works on stale and we see the put after the flush + byte[] b1 = "row1".getBytes(); + Get g = new Get(b1); + g.setConsistency(Consistency.STRONG); + // The following GET will make a trip to the meta to get the new location of the 1st daughter + // In the process it will also get the location of the replica of the daughter (initially + // pointing to the parent's replica) + Result r = t.get(g); + Assert.assertFalse(r.isStale()); + LOG.info("exists stale after flush done"); + + SlowMeCopro.getCdl().set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + // This will succeed because in the previous GET we get the location of the replica + r = t.get(g); + Assert.assertTrue(r.isStale()); + SlowMeCopro.getCdl().get().countDown(); + } finally { + SlowMeCopro.getCdl().get().countDown(); + if (regions != null) { + String node = ZKAssign.getNodeName(zkw, regions.get(0).getRegionInfo() + .getEncodedName()); + ZKUtil.deleteNodeFailSilent(zkw, node); + } + admin.setBalancerRunning(true, false); + cluster.getMaster().setCatalogJanitorEnabled(true); + t.close(); + } + } + private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException, InterruptedException { Put p = new Put(Bytes.toBytes("row1")); @@ -1192,7 +1273,7 @@ public class TestSplitTransactionOnCluster { private HRegion findSplittableRegion(final List regions) throws InterruptedException { for (int i = 0; i < 5; ++i) { for (HRegion r: regions) { - if (r.isSplittable()) { + if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) { return(r); } }