Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 7cc3e966236c268f073be5ea3e6619392a4dd085) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision ) @@ -144,6 +144,7 @@ } public synchronized boolean requestSplit(final HRegion r) { + waitForInflightSchemaChange(r.getRegionInfo().getTableNameAsString()); // don't split regions that are blocking if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { byte[] midKey = r.checkSplit(); @@ -155,6 +156,22 @@ return false; } + /** + * Wait for mid-flight schema alter requests. (if any). We don't want to execute a split + * when a schema alter is in progress as we end up in an inconsistent state. + * @param tableName + */ + private void waitForInflightSchemaChange(String tableName) { + while (this.server.getSchemaChangeTracker() + .isSchemaChangeInProgress(tableName)) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + public synchronized void requestSplit(final HRegion r, byte[] midKey) { if (midKey == null) { LOG.debug("Region " + r.getRegionNameAsString() + Index: src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (date 1320381532000) +++ src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java (revision ) @@ -454,11 +454,11 @@ @Test - public void testConcurrentInstantSchemaChangeAndTableSplit() throws IOException, + public void testConcurrentSplitAndInstantSchemaChange() throws IOException, InterruptedException, KeeperException { - final String tableName = "testInstantSchemaChangeDuringTableSplit"; + final String tableName = "testConcurrentSplitAndInstantSchemaChange"; conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testInstantSchemaChangeDuringTableSplit()"); + LOG.info("Start testConcurrentSplitAndInstantSchemaChange()"); final String newFamily = "newFamily"; HTable ht = createTableAndValidate(tableName); final MasterSchemaChangeTracker msct = @@ -466,7 +466,7 @@ // create now 10 regions TEST_UTIL.createMultiRegions(conf, ht, - HConstants.CATALOG_FAMILY, 10); + HConstants.CATALOG_FAMILY, 4); int rowCount = TEST_UTIL.loadTable(ht, HConstants.CATALOG_FAMILY); //assertRowCount(t, rowCount); @@ -497,17 +497,16 @@ }; splitter.run(); - Thread.sleep(100); schemaChanger.run(); - waitForSchemaChangeProcess(tableName); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); + waitForSchemaChangeProcess(tableName, 40000); - Put put1 = new Put(row); put1.add(Bytes.toBytes(newFamily), qualifier, value); LOG.info("******** Put into new column family "); ht.put(put1); + ht.flushCommits(); + LOG.info("******** Get from new column family "); Get get1 = new Get(row); get1.addColumn(Bytes.toBytes(newFamily), qualifier); Result r = ht.get(get1); @@ -516,10 +515,73 @@ int result = Bytes.compareTo(value, tvalue); assertEquals(result, 0); + LOG.info("End testConcurrentSplitAndInstantSchemaChange() "); + } - LOG.info("End testInstantSchemaChangeDuringTableSplit() "); + @Test + public void testConcurrentInstantSchemaChangeAndSplit() throws IOException, + InterruptedException, KeeperException { + final String tableName = "testConcurrentInstantSchemaChangeAndSplit"; + conf = TEST_UTIL.getConfiguration(); + LOG.info("Start testConcurrentInstantSchemaChangeAndSplit()"); + final String newFamily = "newFamily"; + HTable ht = createTableAndValidate(tableName); + final MasterSchemaChangeTracker msct = + TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); + + // create now 10 regions + TEST_UTIL.createMultiRegions(conf, ht, + HConstants.CATALOG_FAMILY, 4); + int rowCount = TEST_UTIL.loadTable(ht, HConstants.CATALOG_FAMILY); + //assertRowCount(t, rowCount); + + Runnable splitter = new Runnable() { + public void run() { + // run the splits now. + try { + LOG.info("Splitting table now "); + admin.split(Bytes.toBytes(tableName)); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); - } + } + } + }; + Runnable schemaChanger = new Runnable() { + public void run() { + HColumnDescriptor hcd = new HColumnDescriptor(newFamily); + try { + admin.addColumn(Bytes.toBytes(tableName), hcd); + } catch (IOException ioe) { + ioe.printStackTrace(); + + } + } + }; + schemaChanger.run(); + Thread.sleep(50); + splitter.run(); + waitForSchemaChangeProcess(tableName, 40000); + + Put put1 = new Put(row); + put1.add(Bytes.toBytes(newFamily), qualifier, value); + LOG.info("******** Put into new column family "); + ht.put(put1); + ht.flushCommits(); + + LOG.info("******** Get from new column family "); + Get get1 = new Get(row); + get1.addColumn(Bytes.toBytes(newFamily), qualifier); + Result r = ht.get(get1); + byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); + LOG.info(" Value put = " + value + " value from table = " + tvalue); + int result = Bytes.compareTo(value, tvalue); + assertEquals(result, 0); + LOG.info("End testConcurrentInstantSchemaChangeAndSplit() "); + } + /** * This test validates two things. One is that the LoadBalancer does not run when a schema * change process is in progress. The second thing is that it also checks that failed/expired Index: src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (date 1320381532000) +++ src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (revision ) @@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; @@ -95,7 +98,7 @@ private void handleSchemaChanges(List regions) throws IOException { if (instantAction && regions != null && !regions.isEmpty()) { - handleInstantSchemaChanges(regions.size()); + handleInstantSchemaChanges(regions); } else { handleRegularSchemaChanges(regions); } @@ -196,18 +199,66 @@ } } - protected void handleInstantSchemaChanges(int numberOfRegions) { + /** + * Check whether any of the regions from the list of regions is undergoing a split. + * We simply check whether there is a unassigned node for any of the region and if so + * we return as true. + * @param regionInfos + * @return + */ + private boolean isSplitInProgress(List regionInfos) { + for (HRegionInfo hri : regionInfos) { + ZooKeeperWatcher zkw = this.masterServices.getZooKeeper(); + String node = ZKAssign.getNodeName(zkw, hri.getEncodedName()); + try { + if (ZKUtil.checkExists(zkw, node) != -1) { + LOG.debug("Region " + hri.getRegionNameAsString() + " is unassigned. Assuming" + + " that it is undergoing a split"); + return true; + } + } catch (KeeperException ke) { + LOG.debug("KeeperException while determining splits in progress."); + // Assume no splits happening? + return false; + + } + } + return false; + } + + /** + * Wait for region split transaction in progress (if any) + * @param regions + * @param status + */ + private void waitForInflightSplit(List regions, MonitoredTask status) { + while (isSplitInProgress(regions)) { + try { + status.setStatus("Alter Schema is waiting for split region to complete."); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + protected void handleInstantSchemaChanges(List regions) { MonitoredTask status = TaskMonitor.get().createStatus( "Handling alter table request for table = " + tableNameStr); + if (regions == null || regions.isEmpty()) { + LOG.debug("Region size is null or empty. Ignoring alter request."); + return; + } if (canPerformSchemaChange()) { try { // Wait for currently running load balancer to finish. waitForLoadBalancerToComplete(status); + waitForInflightSplit(regions, status); MasterSchemaChangeTracker masterSchemaChangeTracker = this.masterServices.getSchemaChangeTracker(); masterSchemaChangeTracker .createSchemaChangeNode(Bytes.toString(tableName), - numberOfRegions); + regions.size()); while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists( Bytes.toString(tableName))) { try { Index: src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (date 1320381532000) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java (revision ) @@ -317,6 +317,32 @@ this.sleepTimeMillis = sleepTimeMillis; } + /** + * Check whether there are any schema change requests that are in progress now for the given table. + * We simply assume that a schema change is in progress if we see a ZK schema node this + * any table. We may revisit for fine grained checks such as check the current alter status + * et al, but it is not required now. + * @return + */ + public boolean isSchemaChangeInProgress(String tableName) { + try { + List schemaChanges = ZKUtil.listChildrenAndWatchThem(this.watcher, + watcher.schemaZNode); + if (schemaChanges != null) { + for (String alterTableName : schemaChanges) { + if (alterTableName.equals(tableName)) { + return true; + } + } + return false; + } + } catch (KeeperException ke) { + LOG.debug("isSchemaChangeInProgress. " + + "KeeperException while getting current schema change progress."); + return false; + } + return false; + } /** * Holds the current alter state for a table. Alter state includes the Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (date 1320381532000) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision ) @@ -2642,9 +2642,26 @@ splitRegion(regionInfo, null); } + /** + * Wait for mid-flight schema change requests. (if any) + * @param tableName + */ + private void waitForSchemaChange(String tableName) { + while (schemaChangeTracker.isSchemaChangeInProgress(tableName)) { + try { + LOG.debug("Schema alter is inprogress for table = " + tableName + + " Waiting for alter to complete before a split"); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @Override public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint) throws NotServingRegionException, IOException { + waitForSchemaChange(Bytes.toString(regionInfo.getTableName())); checkOpen(); HRegion region = getRegion(regionInfo.getRegionName()); region.flushcache(); @@ -3364,7 +3381,8 @@ LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString() + " Is = " + htd ); HRegion region = - HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf); + HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf, + this, null); // Add new region to the onlineRegions addToOnlineRegions(region); }