From 23008921f5a8c300a6ee7043a475601e459fa578 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 7 Sep 2016 20:38:35 +0800 Subject: [PATCH] HBASE-16505 Add AsyncRegion interface to pass deadline and support async operations --- .../hbase/ipc/DelegatingHBaseRpcController.java | 10 ++ .../hadoop/hbase/ipc/HBaseRpcController.java | 4 + .../hadoop/hbase/ipc/HBaseRpcControllerImpl.java | 13 ++ .../org/apache/hadoop/hbase/ipc/CallRunner.java | 7 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 4 +- .../hadoop/hbase/regionserver/AsyncRegion.java | 99 +++++++++++ .../apache/hadoop/hbase/regionserver/HRegion.java | 197 ++++++++++++++++++++- .../hbase/regionserver/OperationListener.java | 33 ++++ .../apache/hadoop/hbase/regionserver/Region.java | 19 +- .../hbase/regionserver/RegionOperationContext.java | 77 ++++++++ .../regionserver/SynchronousOperationListener.java | 63 +++++++ 11 files changed, 514 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 9f9c636..e098499 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -114,6 +114,16 @@ public class DelegatingHBaseRpcController implements HBaseRpcController { } @Override + public void setDeadline(long deadline) { + delegate.setDeadline(deadline); + } + + @Override + public long getDeadline() { + return delegate.getDeadline(); + } + + @Override public void setFailed(IOException e) { delegate.setFailed(e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 2c4b335..9d87845 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -67,6 +67,10 @@ public interface HBaseRpcController extends RpcController, CellScannable { boolean hasCallTimeout(); + void setDeadline(long deadline); + + long getDeadline(); + /** * Set failed with an exception to pass on. For use in async rpc clients * @param e exception to set with diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index a976473..7bfc9b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { private IOException exception; + private long deadline = Long.MAX_VALUE; + /** * Priority to set on this request. Set it here in controller so available composing the request. * This is the ordained way of setting priorities going forward. We will be undoing the old @@ -117,6 +119,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { cellScanner = null; exception = null; callTimeout = null; + deadline = Long.MAX_VALUE; // In the implementations of some callable with replicas, rpc calls are executed in a executor // and we could cancel the operation from outside which means there could be a race between // reset and startCancel. Although I think the race should be handled by the callable since the @@ -148,6 +151,16 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { } @Override + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + @Override + public long getDeadline() { + return this.deadline; + } + + @Override public synchronized String errorText() { if (!done || exception == null) { return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index b2b3c66..7e4ffee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.util.Pair; @@ -121,7 +122,11 @@ public class CallRunner { } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status, call.startTime, call.timeout); + call.timestamp, this.status, call.startTime, call.timeout); + } catch (TimeoutIOException e) { + RpcServer.LOG.info("Timeout while handling request, won't send response to client: " + call, + e); + return; } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f611796..09549f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -400,7 +400,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return "callId: " + this.id + " service: " + serviceName + " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + - " connection: " + connection.toString(); + " connection: " + connection.toString() + + " timeout: " + timeout; } String toTraceString() { @@ -2207,6 +2208,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { //get an instance of the method arg type HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner); controller.setCallTimeout(timeout); + controller.setDeadline(timeout > 0 ? receiveTime + timeout : Long.MAX_VALUE); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java new file mode 100644 index 0000000..ab1a9df --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.wal.WALSplitter; + +/** + * Async version of Region. Support non-blocking operations and can pass more information into + * the operations. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface AsyncRegion extends Region { + + void getRowLock(RegionOperationContext context, byte[] row, boolean readLock); + + void append(RegionOperationContext context, Append append, long nonceGroup, long nonce); + + void batchMutate(RegionOperationContext context, Mutation[] mutations, + long nonceGroup, long nonce); + + void batchReplay(RegionOperationContext context, WALSplitter.MutationReplay[] mutations, + long replaySeqId); + + void checkAndMutate(RegionOperationContext context, byte [] row, byte [] family, + byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation, + boolean writeToWAL); + + void checkAndRowMutate(RegionOperationContext context, byte [] row, byte [] family, + byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, + RowMutations mutations, boolean writeToWAL); + + void delete(RegionOperationContext context, Delete delete); + + void get(RegionOperationContext context, Get get); + + void get(RegionOperationContext> context, Get get, boolean withCoprocessor); + + void get(RegionOperationContext> context, Get get, boolean withCoprocessor, + long nonceGroup, long nonce); + + void getScanner(RegionOperationContext context, Scan scan); + + void getScanner(RegionOperationContext context, Scan scan, + List additionalScanners); + + void increment(RegionOperationContext context, Increment increment, long nonceGroup, + long nonce); + + void mutateRow(RegionOperationContext context, RowMutations mutations); + + void mutateRowsWithLocks(RegionOperationContext context, Collection mutations, + Collection rowsToLock, long nonceGroup, long nonce); + + void processRowsWithLocks(RegionOperationContext context, RowProcessor processor); + + void processRowsWithLocks(RegionOperationContext context, RowProcessor processor, + long nonceGroup, long nonce); + + void processRowsWithLocks(RegionOperationContext context, RowProcessor processor, + long timeout, long nonceGroup, long nonce); + + void put(RegionOperationContext context, Put put); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f97f6b2..f4a23f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; @@ -197,7 +198,7 @@ import org.apache.htrace.TraceScope; @SuppressWarnings("deprecation") @InterfaceAudience.Private -public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { +public class HRegion implements HeapSize, PropagatingConfigurationObserver, AsyncRegion { private static final Log LOG = LogFactory.getLog(HRegion.class); public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = @@ -5296,6 +5297,200 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @Override + public void append(RegionOperationContext context, Append append, long nonceGroup, + long nonce) { + try { + context.done(append(append, nonceGroup, nonce)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void batchMutate(RegionOperationContext context, Mutation[] mutations, + long nonceGroup, long nonce) { + try { + context.done(batchMutate(mutations, nonceGroup, nonce)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void batchReplay(RegionOperationContext context, + MutationReplay[] mutations, long replaySeqId) { + try { + context.done(batchReplay(mutations, replaySeqId)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void checkAndMutate(RegionOperationContext context, byte[] row, byte[] family, + byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + Mutation mutation, boolean writeToWAL) { + try { + context.done( + checkAndMutate(row, family, qualifier, compareOp, comparator, mutation, writeToWAL)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void checkAndRowMutate(RegionOperationContext context, byte[] row, byte[] family, + byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + RowMutations mutations, boolean writeToWAL) { + try { + context.done( + checkAndRowMutate(row, family, qualifier, compareOp, comparator, mutations, writeToWAL)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void delete(RegionOperationContext context, Delete delete) { + try { + delete(delete); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void get(RegionOperationContext context, Get get) { + try { + context.done(get(get)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void get(RegionOperationContext> context, Get get, boolean withCoprocessor) { + try { + context.done(get(get, withCoprocessor)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void get(RegionOperationContext> context, Get get, boolean withCoprocessor, + long nonceGroup, long nonce) { + try { + context.done(get(get, withCoprocessor, nonceGroup, nonce)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void getScanner(RegionOperationContext context, Scan scan) { + try { + context.done(getScanner(scan)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void getScanner(RegionOperationContext context, Scan scan, + List additionalScanners) { + try { + context.done(getScanner(scan, additionalScanners)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void getRowLock(RegionOperationContext context, byte[] row, boolean readLock) { + try { + context.done(getRowLock(row, readLock)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void increment(RegionOperationContext context, Increment increment, + long nonceGroup, long nonce) { + try { + context.done(increment(increment, nonceGroup, nonce)); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void mutateRow(RegionOperationContext context, RowMutations mutations) { + try { + mutateRow(mutations); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void mutateRowsWithLocks(RegionOperationContext context, + Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) { + try { + mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void processRowsWithLocks(RegionOperationContext context, + RowProcessor processor) { + try { + processRowsWithLocks(processor); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void processRowsWithLocks(RegionOperationContext context, + RowProcessor processor, long nonceGroup, long nonce) { + try { + processRowsWithLocks(processor, nonceGroup, nonce); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void processRowsWithLocks(RegionOperationContext context, + RowProcessor processor, long timeout, long nonceGroup, long nonce) { + try { + processRowsWithLocks(processor, timeout, nonceGroup, nonce); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + + @Override + public void put(RegionOperationContext context, Put put) { + try { + put(put); + context.done(null); + } catch (Throwable e) { + context.error(e); + } + } + public ConcurrentHashMap getLockedRows() { return lockedRows; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java new file mode 100644 index 0000000..91e113e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Interface for listeners of AsyncRegion. Normally called when an operation completes. + * @param type of result, Void of it has no result. + */ +@InterfaceAudience.Private +public interface OperationListener { + + void success(T result); + + void fail(Throwable t); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index efd68b8..c04ead2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -282,15 +282,16 @@ public interface Region extends ConfigurationObserver { } /** - * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. - * Otherwise, just tries to obtain the lock and returns - * false if unavailable. - * @return the row lock if acquired, - * null if waitForLock was false and the lock was not acquired - * @throws IOException if waitForLock was true and the lock could not be acquired after waiting - */ - RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException; + * + * Get a row lock for the specified row. All locks are reentrant. + * + * Before calling this function make sure that a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + * @param row The row actions will be performed against + * @param readLock is the lock reader or writer. True indicates that a non-exlcusive + * lock is requested + */ + RowLock getRowLock(byte[] row, boolean readLock) throws IOException; /** * If the given list of row locks is not null, releases all locks. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java new file mode 100644 index 0000000..adf5a59 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.protobuf.RpcController; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; + +@InterfaceAudience.Private +public class RegionOperationContext { + + private long deadline = Long.MAX_VALUE; + private List> listeners = new ArrayList<>(); + + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + public RegionOperationContext() { + } + + public RegionOperationContext(RegionOperationContext context) { + this.deadline = context.deadline; + } + + public RegionOperationContext(RpcController controller) { + if (controller instanceof HBaseRpcController) { + this.deadline = ((HBaseRpcController) controller).getDeadline(); + } + } + + public synchronized void addListener(OperationListener listener) { + listeners.add(listener); + } + + /** + * We will call this only in one thread, so no need to lock. + */ + public void error(Throwable error) { + for(OperationListener listener : listeners) { + listener.fail(error); + } + } + + /** + * We will call this only in one thread, so no need to lock. + */ + public void done(T result) { + for(OperationListener listener : listeners) { + listener.success(result); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java new file mode 100644 index 0000000..04545f4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.base.Throwables; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A OperationListener supporting getting result directly. Temporarily used when AsyncRegion is + * not fully non-blocking. When call getResult of this class, the operation must have been done. + */ +@InterfaceAudience.Private +public class SynchronousOperationListener implements OperationListener { + + private T result; + private boolean done; + private Throwable error; + + @Override + public void success(T result) { + done = true; + this.result = result; + } + + @Override + public void fail(Throwable error) { + done = true; + this.error = error; + } + + /** + * We call this method after calling operation of AsyncRegion synchronously in the same thread. + * So no need to lock and success/fail must has been called. + */ + public T getResult() throws IOException { + assert done; + if (error != null) { + // We also need throw unchecked throwable + Throwables.propagateIfPossible(error, IOException.class); + // Wrap to IOE if it is not IOE or unchecked + throw new IOException(error); + } + return result; + } +} -- 2.7.4 (Apple Git-66)