From e7b1a384b7e497be0a988b1665e33e61611ce7a1 Mon Sep 17 00:00:00 2001 From: Pengyue Li Date: Wed, 31 Aug 2016 17:54:50 -0700 Subject: [PATCH 1/2] HBASE-15165 AsyncProcess can spin wait indefinitly --- .../apache/hadoop/hbase/client/AsyncProcess.java | 34 ++++++++++++++++------ .../hadoop/hbase/client/BufferedMutatorImpl.java | 10 ++++--- .../hadoop/hbase/client/TestAsyncProcess.java | 13 ++++++--- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 5bb0f58..20f0134 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -398,7 +399,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(TableName tableName, final List rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } /** @@ -407,7 +408,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(TableName tableName, final RowAccess rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { + boolean needResults) throws InterruptedIOException, TimeoutIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } /** @@ -416,7 +417,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, List rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { + boolean needResults) throws InterruptedIOException, TimeoutIOException { return submit(pool, tableName, new ListRowAccess(rows), atLeastOne, callback, needResults); } @@ -435,7 +436,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, RowAccess rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { + boolean needResults) throws InterruptedIOException, TimeoutIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; } @@ -503,6 +504,16 @@ class AsyncProcess { } } firstIter = false; + //wait for 10 milliseconds to avoid dead spin + if ( retainedActions.isEmpty() && atLeastOne && (locationErrors == null)) { + try { + synchronized (tasksInProgress) { + tasksInProgress.wait(10); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("#" + id + ", interrupted."); + } + } } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); if (retainedActions.isEmpty()) return NO_REQS_RESULT; @@ -1813,25 +1824,30 @@ class AsyncProcess { @VisibleForTesting /** Waits until all outstanding tasks are done. Used in tests. */ - void waitUntilDone() throws InterruptedIOException { + void waitUntilDone() throws InterruptedIOException, TimeoutIOException { waitForMaximumCurrentTasks(0, null); } /** Wait until the async does not have more than max tasks in progress. */ private void waitForMaximumCurrentTasks(int max, String tableName) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName); } // Break out this method so testable @VisibleForTesting void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id, - String tableName) throws InterruptedIOException { + String tableName) throws InterruptedIOException, TimeoutIOException { + long waitStart = EnvironmentEdgeManager.currentTime(); long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; while ((currentInProgress = tasksInProgress.get()) > max) { + long now = EnvironmentEdgeManager.currentTime(); + if ( now - waitStart > this.timeout ) { + throw new TimeoutIOException("Waiting time is longer than timeout :" + this.timeout); + } if (oldInProgress != currentInProgress) { // Wait for in progress to change. - long now = EnvironmentEdgeManager.currentTime(); + if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" @@ -1895,7 +1911,7 @@ class AsyncProcess { * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List failedRows, String tableName) throws InterruptedIOException { + List failedRows, String tableName) throws InterruptedIOException, TimeoutIOException { waitForMaximumCurrentTasks(0, tableName); if (!globalErrors.hasErrors()) { return null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 2d4c8b3..b253f5f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import java.io.IOException; @@ -123,13 +124,13 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public void mutate(Mutation m) throws InterruptedIOException, + public void mutate(Mutation m) throws InterruptedIOException, TimeoutIOException, RetriesExhaustedWithDetailsException { mutate(Arrays.asList(m)); } @Override - public void mutate(List ms) throws InterruptedIOException, + public void mutate(List ms) throws InterruptedIOException, TimeoutIOException, RetriesExhaustedWithDetailsException { if (closed) { @@ -203,7 +204,7 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public synchronized void flush() throws InterruptedIOException, + public synchronized void flush() throws InterruptedIOException, TimeoutIOException, RetriesExhaustedWithDetailsException { // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. @@ -220,6 +221,7 @@ public class BufferedMutatorImpl implements BufferedMutator { */ private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, + TimeoutIOException, RetriesExhaustedWithDetailsException { if (!synchronous && writeAsyncBuffer.isEmpty()) { return; @@ -267,7 +269,7 @@ public class BufferedMutatorImpl implements BufferedMutator { */ @Deprecated public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, - InterruptedIOException { + InterruptedIOException, TimeoutIOException { this.writeBufferSize = writeBufferSize; if (currentWriteBufferSize.get() > writeBufferSize) { flush(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index bcc052d..c20a3d1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -199,14 +200,14 @@ public class TestAsyncProcess { @Override public AsyncRequestFuture submit(TableName tableName, RowAccess rows, boolean atLeastOne, Callback callback, boolean needResults) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { // We use results in tests to check things, so override to always save them. return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } @Override public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Callback callback, boolean needResults) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { // We use results in tests to check things, so override to always save them. return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } @@ -860,7 +861,7 @@ public class TestAsyncProcess { testTaskCount(ap); } - private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException { + private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException, TimeoutIOException { List puts = new ArrayList<>(); for (int i = 0; i != 3; ++i) { puts.add(createPut(1, true)); @@ -1621,7 +1622,11 @@ public class TestAsyncProcess { } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); - } catch (BrokenBarrierException e) { + } catch (TimeoutIOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } -- 2.8.0-rc2 From a565d88385b2bf7b4755ead18de28eb68ffc95dc Mon Sep 17 00:00:00 2001 From: Pengyue Li Date: Thu, 8 Sep 2016 16:23:09 -0700 Subject: [PATCH 2/2] Trigger test for client side change HBASE-15165 --- hbase-server/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 6cf1bb6..0428571 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -19,6 +19,7 @@ * limitations under the License. */ --> + 4.0.0 hbase -- 2.8.0-rc2