diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 86a24ad..b83c344 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1584,7 +1584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage()); } } - + this.closing.set(true); + status.setStatus("Disabling writes for close"); if (timeoutForWriteLock == null || timeoutForWriteLock == Long.MAX_VALUE) { // block waiting for the lock for closing @@ -1599,8 +1600,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw (InterruptedIOException) new InterruptedIOException().initCause(e); } } - this.closing.set(true); - status.setStatus("Disabling writes for close"); try { if (this.isClosed()) { status.abort("Already got closed by another process"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionReopeningOrSplitRetryOperationsInCoprocessors.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionReopeningOrSplitRetryOperationsInCoprocessors.java new file mode 100644 index 0000000..6310802 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionReopeningOrSplitRetryOperationsInCoprocessors.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(MediumTests.class) +public class TestRegionReopeningOrSplitRetryOperationsInCoprocessors { + + private static final Log LOG = LogFactory + .getLog(TestRegionReopeningOrSplitRetryOperationsInCoprocessors.class); + + private static HBaseTestingUtility util; + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] row2 = Bytes.toBytes("r2"); + private static final byte[] test = Bytes.toBytes("test"); + private static final CountDownLatch latch = new CountDownLatch(1); + private static boolean isRegionClosing = false; + private static int counter = 0; + private static MiniHBaseCluster miniCluster = null; + + @Rule + public TestName name = new TestName(); + private static TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + util = new HBaseTestingUtility(conf); + miniCluster = util.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + tableName = TableName.valueOf(name.getMethodName()); + } + + private void createTable(String coprocessor) throws IOException { + HTableDescriptor htd = + new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(dummy)) + .addFamily(new HColumnDescriptor(test)) + .addCoprocessor(ScannerOpeningObserver.class.getName()); + util.getAdmin().createTable(htd); + } + + @Test + public void testRegionClosing() throws Exception { + createTable(ScannerOpeningObserver.class.getName()); + + try (final Table t = util.getConnection().getTable(tableName)) { + t.put(new Put(row1).addColumn(test, dummy, dummy)); + Thread scannerThread = new Thread() { + public void run() { + try { + ResultScanner scanner = t.getScanner(new Scan()); + scanner.next(); + } catch (IOException e) { + } + }; + }; + + Thread closingThread = new Thread() { + public void run() { + try { + List regions = util.getAdmin().getRegions(tableName); + List regionServerThreads = miniCluster.getRegionServerThreads(); + for (RegionServerThread rs : regionServerThreads) { + if (rs.getRegionServer().getOnlineRegion(regions.get(0).getRegionName()) == null) { + util.getAdmin().move(regions.get(0).getEncodedNameAsBytes(), + Bytes.toBytes(rs.getRegionServer().getServerName().getServerName())); + break; + } + } + + } catch (IOException e) { + } + }; + }; + scannerThread.start(); + closingThread.start(); + closingThread.join(); + scannerThread.join(); + ResultScanner scanner = t.getScanner(new Scan()); + int count = 0; + for (Result r : scanner) { + count++; + } + assertEquals(2, count); + } + } + + public static class ScannerOpeningObserver implements RegionObserver { + + @Override + public RegionScanner postScannerOpen(ObserverContext e, + Scan scan, RegionScanner s) throws IOException { + final Region region = e.getEnvironment().getRegion(); + region.startRegionOperation(); + try { + while (!isRegionClosing) { + Threads.sleep(100); + } + latch.countDown(); + Threads.sleep(100); + if (counter++ <= 1) { + Thread writingThread = new Thread() { + public void run() { + try { + region.flush(true); + } catch (IOException e) { + } + }; + }; + writingThread.start(); + try { + writingThread.join(); + } catch (InterruptedException e1) { + } + region.put(new Put(row2).addColumn(test, dummy, dummy)); + } + } finally { + region.closeRegionOperation(); + } + return s; + } + + @Override + public void preClose(ObserverContext c, boolean abortRequested) + throws IOException { + try { + isRegionClosing = true; + latch.await(); + } catch (InterruptedException e) { + } + } + + } +}