From 82ccc3a3c7126422d8808ef6d51673ce0e7ff5c7 Mon Sep 17 00:00:00 2001 From: Pengyue Li Date: Wed, 31 Aug 2016 17:54:50 -0700 Subject: [PATCH] HBASE-15165 AsyncProcess can spin wait indefinitly --- .../apache/hadoop/hbase/client/AsyncProcess.java | 30 +++++++++++++++++----- .../hadoop/hbase/client/BufferedMutatorImpl.java | 10 +++++--- .../hadoop/hbase/client/TestAsyncProcess.java | 9 +++++-- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index d699233..c4f8e10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -363,7 +364,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Batch.Callback callback, boolean needResults) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { return submit(null, tableName, rows, atLeastOne, callback, needResults); } @@ -381,7 +382,7 @@ class AsyncProcess { */ public AsyncRequestFuture submit(ExecutorService pool, TableName tableName, List rows, boolean atLeastOne, Batch.Callback callback, - boolean needResults) throws InterruptedIOException { + boolean needResults) throws InterruptedIOException, TimeoutIOException { if (rows.isEmpty()) { return NO_REQS_RESULT; } @@ -445,6 +446,16 @@ class AsyncProcess { it.remove(); } } + //wait for 10 milliseconds to avoid dead spin + if ( retainedActions.isEmpty() && atLeastOne && (locationErrors == null)) { + try { + synchronized (tasksInProgress) { + tasksInProgress.wait(10); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("#" + id + ", interrupted."); + } + } } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); if (retainedActions.isEmpty()) return NO_REQS_RESULT; @@ -1777,25 +1788,30 @@ class AsyncProcess { @VisibleForTesting /** Waits until all outstanding tasks are done. Used in tests. */ - void waitUntilDone() throws InterruptedIOException { + void waitUntilDone() throws InterruptedIOException, TimeoutIOException { waitForMaximumCurrentTasks(0, null); } /** Wait until the async does not have more than max tasks in progress. */ private void waitForMaximumCurrentTasks(int max, String tableName) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName); } // Break out this method so testable @VisibleForTesting void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id, - String tableName) throws InterruptedIOException { + String tableName) throws InterruptedIOException, TimeoutIOException { + long waitStart = EnvironmentEdgeManager.currentTime(); long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; while ((currentInProgress = tasksInProgress.get()) > max) { + long now = EnvironmentEdgeManager.currentTime(); + if ( now - waitStart > this.timeout ) { + throw new TimeoutIOException("Waiting time is longer than timeout :" + this.timeout); + } if (oldInProgress != currentInProgress) { // Wait for in progress to change. - long now = EnvironmentEdgeManager.currentTime(); + if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" @@ -1859,7 +1875,7 @@ class AsyncProcess { * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List failedRows, String tableName) throws InterruptedIOException { + List failedRows, String tableName) throws InterruptedIOException, TimeoutIOException { waitForMaximumCurrentTasks(0, tableName); if (!globalErrors.hasErrors()) { return null; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 39e4f75..513294d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import java.io.IOException; @@ -115,13 +116,13 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public void mutate(Mutation m) throws InterruptedIOException, + public void mutate(Mutation m) throws InterruptedIOException, TimeoutIOException, RetriesExhaustedWithDetailsException { mutate(Arrays.asList(m)); } @Override - public void mutate(List ms) throws InterruptedIOException, + public void mutate(List ms) throws InterruptedIOException, TimeoutIOException, RetriesExhaustedWithDetailsException { if (closed) { @@ -190,7 +191,7 @@ public class BufferedMutatorImpl implements BufferedMutator { } @Override - public synchronized void flush() throws InterruptedIOException, + public synchronized void flush() throws InterruptedIOException, TimeoutIOException, RetriesExhaustedWithDetailsException { // As we can have an operation in progress even if the buffer is empty, we call // backgroundFlushCommits at least one time. @@ -207,6 +208,7 @@ public class BufferedMutatorImpl implements BufferedMutator { */ private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, + TimeoutIOException, RetriesExhaustedWithDetailsException { LinkedList buffer = new LinkedList<>(); @@ -271,7 +273,7 @@ public class BufferedMutatorImpl implements BufferedMutator { */ @Deprecated public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, - InterruptedIOException { + InterruptedIOException, TimeoutIOException { this.writeBufferSize = writeBufferSize; if (currentWriteBufferSize.get() > writeBufferSize) { flush(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 5959078..fc9ce92 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -183,7 +184,7 @@ public class TestAsyncProcess { @Override public AsyncRequestFuture submit(TableName tableName, List rows, boolean atLeastOne, Callback callback, boolean needResults) - throws InterruptedIOException { + throws InterruptedIOException, TimeoutIOException { // We use results in tests to check things, so override to always save them. return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } @@ -1154,7 +1155,11 @@ public class TestAsyncProcess { } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); - } catch (BrokenBarrierException e) { + } catch (TimeoutIOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } -- 2.8.0-rc2