From 54055083bcdc456a79271785721d4e8ed03e5b35 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 11 Jun 2019 17:35:56 +0800 Subject: [PATCH] HBASE-22550 Throw exception when creating thread pool if the connection has already been closed --- .../hbase/client/AsyncConnectionImpl.java | 10 +++--- .../client/ConnectionOverAsyncConnection.java | 14 ++++++--- .../hbase/client/TableOverAsyncTable.java | 11 ++++--- .../hadoop/hbase/client/TestConnection.java | 31 ++++++++++++++++++- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 84e1da635f..78fad9ea23 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -113,7 +114,7 @@ class AsyncConnectionImpl implements AsyncConnection { private ChoreService authService; - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private final Optional metrics; @@ -188,14 +189,12 @@ class AsyncConnectionImpl implements AsyncConnection { @Override public boolean isClosed() { - return closed; + return closed.get(); } @Override public void close() { - // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a - // simple volatile flag. - if (closed) { + if (!closed.compareAndSet(false, true)) { return; } IOUtils.closeQuietly(clusterStatusListener); @@ -209,7 +208,6 @@ class AsyncConnectionImpl implements AsyncConnection { if (c != null) { c.closePool(); } - closed = true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index e66733dc4d..b61cef5c91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -24,10 +24,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -130,7 +132,7 @@ class ConnectionOverAsyncConnection implements Connection { // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call // AsyncConnection.close. - void closePool() { + synchronized void closePool() { ExecutorService batchPool = this.batchPool; if (batchPool != null) { ConnectionUtils.shutdownPool(batchPool); @@ -165,9 +167,12 @@ class ConnectionOverAsyncConnection implements Connection { // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin // interface. - private ExecutorService getBatchPool() { + private ExecutorService getBatchPool() throws IOException { if (batchPool == null) { synchronized (this) { + if (isClosed()) { + throw new DoNotRetryIOException("Connection is closed"); + } if (batchPool == null) { this.batchPool = createThreadPool(); } @@ -182,13 +187,14 @@ class ConnectionOverAsyncConnection implements Connection { @Override public Table build() { - ExecutorService p = pool != null ? pool : getBatchPool(); + IOExceptionSupplier poolSupplier = + pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool; return new TableOverAsyncTable(conn, conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(), - p); + poolSupplier); } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 5686b09c8b..0a2a66eeca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; @@ -76,12 +77,13 @@ class TableOverAsyncTable implements Table { private final AsyncTable table; - private final ExecutorService pool; + private final IOExceptionSupplier poolSupplier; - TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable table, ExecutorService pool) { + TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable table, + IOExceptionSupplier poolSupplier) { this.conn = conn; this.table = table; - this.pool = pool; + this.poolSupplier = poolSupplier; } @Override @@ -423,6 +425,7 @@ class TableOverAsyncTable implements Table { private void coprocssorService(String serviceName, byte[] startKey, byte[] endKey, Callback callback, StubCall call) throws Throwable { // get regions covered by the row range + ExecutorService pool = this.poolSupplier.get(); List keys = getStartKeysInRange(startKey, endKey); Map> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); try { @@ -443,7 +446,7 @@ class TableOverAsyncTable implements Table { } } catch (RejectedExecutionException e) { // maybe the connection has been closed, let's check - if (pool.isShutdown()) { + if (conn.isClosed()) { throw new DoNotRetryIOException("Connection is closed", e); } else { throw new HBaseIOException("Coprocessor operation is rejected", e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java index 8dd4709c1c..df715c2376 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.util.List; import java.util.Set; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -336,7 +342,7 @@ public class TestConnection { TEST_UTIL.getAdmin().createTable(builder.build()); try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); - RegionLocator locator = conn.getRegionLocator(tableName)) { + RegionLocator locator = conn.getRegionLocator(tableName)) { // Get locations of the regions of the table List locations = locator.getAllRegionLocations(); @@ -353,4 +359,27 @@ public class TestConnection { TEST_UTIL.deleteTable(tableName); } } + + @Test(expected = DoNotRetryIOException.class) + public void testClosedConnection() throws ServiceException, Throwable { + byte[] family = Bytes.toBytes("cf"); + TableName tableName = TableName.valueOf(name.getMethodName()); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) + .setCoprocessor(MultiRowMutationEndpoint.class.getName()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); + TEST_UTIL.getAdmin().createTable(builder.build()); + + Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + // cache the location + try (Table table = conn.getTable(tableName)) { + table.get(new Get(Bytes.toBytes(0))); + } finally { + conn.close(); + } + Batch.Call callable = service -> { + throw new RuntimeException("Should not arrive here"); + }; + conn.getTable(tableName).coprocessorService(MultiRowMutationService.class, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable); + } } -- 2.17.1