Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 1408174) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -240,19 +240,32 @@ server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout); - this.journal.add(JournalEntry.STARTED_SPLITTING); // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't // have zookeeper so don't do zk stuff if server or zookeeper is null if (server != null && server.getZooKeeper() != null) { try { - this.znodeVersion = createNodeSplitting(server.getZooKeeper(), + createNodeSplitting(server.getZooKeeper(), this.parent.getRegionInfo(), server.getServerName()); } catch (KeeperException e) { - throw new IOException("Failed setting SPLITTING znode on " + + throw new IOException("Failed creating SPLITTING znode on " + this.parent.getRegionNameAsString(), e); } } this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); + if (server != null && server.getZooKeeper() != null) { + try { + // Transition node from SPLITTING to SPLITTING after creating the split node. + // Master will get the callback for node change only if the transition is successful. + // Note that if the transition fails then the rollback will delete the created znode + // as the journal entry SET_SPLITTING_IN_ZK is added. + // TODO : May be we can add some new state to znode and handle the new state incase of success/failure + this.znodeVersion = transitionNodeSplitting(server.getZooKeeper(), + this.parent.getRegionInfo(), server.getServerName(), -1); + } catch (KeeperException e) { + throw new IOException("Failed setting SPLITTING znode on " + + this.parent.getRegionNameAsString(), e); + } + } createSplitDir(this.parent.getFilesystem(), this.splitdir); this.journal.add(JournalEntry.CREATE_SPLIT_DIR); @@ -750,15 +763,9 @@ JournalEntry je = iterator.previous(); switch(je) { - case STARTED_SPLITTING: - if (server != null && server.getZooKeeper() != null) { - cleanZK(server, this.parent.getRegionInfo(), false); - } - break; - case SET_SPLITTING_IN_ZK: if (server != null && server.getZooKeeper() != null) { - cleanZK(server, this.parent.getRegionInfo(), true); + cleanZK(server, this.parent.getRegionInfo()); } break; @@ -855,15 +862,11 @@ LOG.info("Cleaned up old failed split transaction detritus: " + splitdir); } - private static void cleanZK(final Server server, final HRegionInfo hri, boolean abort) { + private static void cleanZK(final Server server, final HRegionInfo hri) { try { // Only delete if its in expected state; could have been hijacked. ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), EventType.RS_ZK_REGION_SPLITTING); - } catch (KeeperException.NoNodeException nn) { - if (abort) { - server.abort("Failed cleanup of " + hri.getRegionNameAsString(), nn); - } } catch (KeeperException e) { server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (revision 1408174) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java (working copy) @@ -19,23 +19,41 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.DeserializationException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.RegionTransition; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.handler.SplitRegionHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -67,6 +85,10 @@ private HBaseAdmin admin = null; private MiniHBaseCluster cluster = null; private static final int NB_SERVERS = 2; + private static CountDownLatch latch = new CountDownLatch(1); + private static boolean secondSplit = false; + private static boolean callRollBack = false; + private static boolean firstSplitCompleted = false; private static final HBaseTestingUtility TESTING_UTIL = new HBaseTestingUtility(); @@ -538,10 +560,107 @@ @Test public void testSplitBeforeSettingSplittingInZK() throws IOException, InterruptedException, KeeperException { - testSplitBeforeSettingSplittingInZK(true); - testSplitBeforeSettingSplittingInZK(false); + testSplitBeforeSettingSplittingInZKInternals(); } + @Test(timeout = 20000) + public void testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack() throws Exception { + final byte[] tableName = Bytes + .toBytes("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack"); + HBaseAdmin admin = new HBaseAdmin(TESTING_UTIL.getConfiguration()); + try { + // Create table then get the single region for our new table. + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("cf")); + admin.createTable(htd); + HTable t = new HTable(cluster.getConfiguration(), tableName); + while (!(cluster.getRegions(tableName).size() == 1)) { + Thread.sleep(100); + } + final List regions = cluster.getRegions(tableName); + HRegionInfo hri = getAndCheckSingleTableRegion(regions); + int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName()); + final HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); + insertData(tableName, admin, t); + // Turn off balancer so it doesn't cut in and mess up our placements. + this.admin.setBalancerRunning(false, false); + // Turn off the meta scanner so it don't remove parent on us. + cluster.getMaster().setCatalogJanitorEnabled(false); + + new Thread() { + public void run() { + SplitTransaction st = null; + st = new MockedSplitTransaction(regions.get(0), Bytes.toBytes("row2")); + try { + st.prepare(); + st.execute(regionServer, regionServer); + } catch (IOException e) { + + } + } + }.start(); + while (!callRollBack) { + Thread.sleep(100); + } + SplitTransaction st = null; + st = new MockedSplitTransaction(regions.get(0), Bytes.toBytes("row2")); + try { + secondSplit = true; + st.prepare(); + st.execute(regionServer, regionServer); + } catch (IOException e) { + LOG.debug("Rollback started :"+ e.getMessage()); + st.rollback(regionServer, regionServer); + } + while (!firstSplitCompleted) { + Thread.sleep(100); + } + RegionStates regionStates = cluster.getMaster().getAssignmentManager().getRegionStates(); + Map rit = regionStates.getRegionsInTransition(); + + while (rit.containsKey(hri.getTableNameAsString())) { + Thread.sleep(100); + } + List onlineRegions = regionServer.getOnlineRegions(tableName); + // Region server side split is successful. + assertEquals("The parent region should be splitted", 2, onlineRegions.size()); + //Should be present in RIT + List regionsOfTable = cluster.getMaster().getAssignmentManager() + .getRegionStates().getRegionsOfTable(tableName); + // Master side should also reflect the same + assertEquals("No of regions in master", 2, regionsOfTable.size()); + } finally { + admin.setBalancerRunning(true, false); + secondSplit = false; + firstSplitCompleted = false; + callRollBack = false; + cluster.getMaster().setCatalogJanitorEnabled(true); + if (admin.isTableAvailable(tableName) && admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + admin.close(); + } + } + } + + private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException, + InterruptedException { + Put p = new Put(Bytes.toBytes("row1")); + p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1")); + t.put(p); + p = new Put(Bytes.toBytes("row2")); + p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2")); + t.put(p); + p = new Put(Bytes.toBytes("row3")); + p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3")); + t.put(p); + p = new Put(Bytes.toBytes("row4")); + p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4")); + t.put(p); + admin.flush(tableName); + } + + @Test public void testShouldThrowIOExceptionIfStoreFileSizeIsEmptyAndSHouldSuccessfullyExecuteRollback() throws Exception { @@ -582,7 +701,7 @@ } - private void testSplitBeforeSettingSplittingInZK(boolean nodeCreated) throws IOException, + private void testSplitBeforeSettingSplittingInZKInternals() throws IOException, KeeperException { final byte[] tableName = Bytes.toBytes("testSplitBeforeSettingSplittingInZK"); HBaseAdmin admin = TESTING_UTIL.getHBaseAdmin(); @@ -596,17 +715,9 @@ int regionServerIndex = cluster.getServerWith(regions.get(0).getRegionName()); HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); SplitTransaction st = null; - if (nodeCreated) { + { st = new MockedSplitTransaction(regions.get(0), null) { @Override - int transitionNodeSplitting(ZooKeeperWatcher zkw, HRegionInfo parent, - ServerName serverName, int version) throws KeeperException, IOException { - throw new IOException(); - } - }; - } else { - st = new MockedSplitTransaction(regions.get(0), null) { - @Override int createNodeSplitting(ZooKeeperWatcher zkw, HRegionInfo region, ServerName serverName) throws KeeperException, IOException { throw new IOException(); @@ -618,9 +729,7 @@ } catch (IOException e) { String node = ZKAssign.getNodeName(regionServer.getZooKeeper(), regions.get(0) .getRegionInfo().getEncodedName()); - if (nodeCreated) { - assertFalse(ZKUtil.checkExists(regionServer.getZooKeeper(), node) == -1); - } else { + { assertTrue(ZKUtil.checkExists(regionServer.getZooKeeper(), node) == -1); } assertTrue(st.rollback(regionServer, regionServer)); @@ -636,9 +745,44 @@ public static class MockedSplitTransaction extends SplitTransaction { + private HRegion currentRegion; public MockedSplitTransaction(HRegion r, byte[] splitrow) { super(r, splitrow); + this.currentRegion = r; } + + @Override + void transitionZKNode(Server server, RegionServerServices services, HRegion a, HRegion b) + throws IOException { + if (this.currentRegion.getRegionInfo().getTableNameAsString() + .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { + try { + if (!secondSplit){ + callRollBack = true; + latch.await(); + } + } catch (InterruptedException e) { + } + + } + super.transitionZKNode(server, services, a, b); + if (this.currentRegion.getRegionInfo().getTableNameAsString() + .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { + firstSplitCompleted = true; + } + } + @Override + public boolean rollback(Server server, RegionServerServices services) throws IOException { + if (this.currentRegion.getRegionInfo().getTableNameAsString() + .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) { + if(secondSplit){ + super.rollback(server, services); + latch.countDown(); + return true; + } + } + return super.rollback(server, services); + } }