From d549d07f877c3b89bbbb6756a7e9d8832ece82b6 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Fri, 2 Sep 2016 19:01:09 +0800 Subject: [PATCH] HBASE-16505 Add AsyncRegion interface to pass deadline and support async operations --- .../hbase/ipc/PayloadCarryingRpcController.java | 10 ++ .../hadoop/hbase/ipc/IntegrationTestRpcClient.java | 3 +- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 9 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 12 +- .../hadoop/hbase/ipc/RpcServerInterface.java | 5 +- .../hadoop/hbase/regionserver/AsyncRegion.java | 99 +++++++++++ .../apache/hadoop/hbase/regionserver/HRegion.java | 197 ++++++++++++++++++++- .../apache/hadoop/hbase/regionserver/Region.java | 19 +- .../hbase/regionserver/RegionOperationContext.java | 104 +++++++++++ 9 files changed, 440 insertions(+), 18 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index d9877dc..67a8f8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -48,6 +48,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl protected final AtomicReference> cancellationCb = new AtomicReference<>(null); protected final AtomicReference> failureCb = new AtomicReference<>(null); private IOException exception; + private long deadline = Long.MAX_VALUE; public static final int PRIORITY_UNSET = -1; /** @@ -112,6 +113,14 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl return priority; } + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + @Override public void reset() { priority = 0; @@ -121,6 +130,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl failureCb.set(null); cancellationCb.set(null); callTimeout = null; + deadline = Long.MAX_VALUE; } public int getCallTimeout() { diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 28c19ad..4133ba8 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; @@ -88,7 +89,7 @@ public class IntegrationTestRpcClient { @Override public Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { + throws IOException, TimeoutException { return super.call(service, md, param, cellScanner, receiveTime, status); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index b2b3c66..6633be7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; */ import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -121,13 +122,17 @@ public class CallRunner { } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status, call.startTime, call.timeout); + call.timestamp, this.status, call.startTime, call.timeout); + } catch (TimeoutException e) { + RpcServer.LOG.info("Timeout while handling request, won't send response to client: " + call, + e); + return; } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; error = StringUtils.stringifyException(e); if (e instanceof Error) { - throw (Error)e; + throw (Error) e; } } finally { if (traceScope != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index c787c98..e19c21e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -58,6 +58,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -400,7 +401,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return "callId: " + this.id + " service: " + serviceName + " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + - " connection: " + connection.toString(); + " connection: " + connection.toString() + + " timeout: " + timeout; } String toTraceString() { @@ -2186,7 +2188,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { + throws IOException, TimeoutException { return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0); } @@ -2198,7 +2200,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) - throws IOException { + throws IOException, TimeoutException { try { status.setRPC(md.getName(), new Object[]{param}, receiveTime); // TODO: Review after we add in encoded data blocks. @@ -2207,6 +2209,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { //get an instance of the method arg type PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); controller.setCallTimeout(timeout); + controller.setDeadline(timeout > 0 ? receiveTime + timeout : Long.MAX_VALUE); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); @@ -2257,6 +2260,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (e instanceof LinkageError) throw new DoNotRetryIOException(e); if (e instanceof IOException) throw (IOException)e; + if (e instanceof TimeoutException) { + throw (TimeoutException)e; + } LOG.error("Unexpected throwable object ", e); throw new IOException(e.getMessage(), e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 0388ea4..0a3e6a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -54,11 +55,11 @@ public interface RpcServerInterface { @Deprecated Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException, ServiceException; + throws IOException, ServiceException, TimeoutException; Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, - int timeout) throws IOException, ServiceException; + int timeout) throws IOException, ServiceException, TimeoutException; void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java new file mode 100644 index 0000000..ab1a9df --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.wal.WALSplitter; + +/** + * Async version of Region. Support non-blocking operations and can pass more information into + * the operations. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface AsyncRegion extends Region { + + void getRowLock(RegionOperationContext context, byte[] row, boolean readLock); + + void append(RegionOperationContext context, Append append, long nonceGroup, long nonce); + + void batchMutate(RegionOperationContext context, Mutation[] mutations, + long nonceGroup, long nonce); + + void batchReplay(RegionOperationContext context, WALSplitter.MutationReplay[] mutations, + long replaySeqId); + + void checkAndMutate(RegionOperationContext context, byte [] row, byte [] family, + byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation, + boolean writeToWAL); + + void checkAndRowMutate(RegionOperationContext context, byte [] row, byte [] family, + byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, + RowMutations mutations, boolean writeToWAL); + + void delete(RegionOperationContext context, Delete delete); + + void get(RegionOperationContext context, Get get); + + void get(RegionOperationContext> context, Get get, boolean withCoprocessor); + + void get(RegionOperationContext> context, Get get, boolean withCoprocessor, + long nonceGroup, long nonce); + + void getScanner(RegionOperationContext context, Scan scan); + + void getScanner(RegionOperationContext context, Scan scan, + List additionalScanners); + + void increment(RegionOperationContext context, Increment increment, long nonceGroup, + long nonce); + + void mutateRow(RegionOperationContext context, RowMutations mutations); + + void mutateRowsWithLocks(RegionOperationContext context, Collection mutations, + Collection rowsToLock, long nonceGroup, long nonce); + + void processRowsWithLocks(RegionOperationContext context, RowProcessor processor); + + void processRowsWithLocks(RegionOperationContext context, RowProcessor processor, + long nonceGroup, long nonce); + + void processRowsWithLocks(RegionOperationContext context, RowProcessor processor, + long timeout, long nonceGroup, long nonce); + + void put(RegionOperationContext context, Put put); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f97f6b2..da41199 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; @@ -197,7 +198,7 @@ import org.apache.htrace.TraceScope; @SuppressWarnings("deprecation") @InterfaceAudience.Private -public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { +public class HRegion implements HeapSize, PropagatingConfigurationObserver, AsyncRegion { private static final Log LOG = LogFactory.getLog(HRegion.class); public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = @@ -5296,6 +5297,200 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @Override + public void append(RegionOperationContext context, Append append, long nonceGroup, + long nonce) { + try { + context.done(append(append, nonceGroup, nonce)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void batchMutate(RegionOperationContext context, Mutation[] mutations, + long nonceGroup, long nonce) { + try { + context.done(batchMutate(mutations, nonceGroup, nonce)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void batchReplay(RegionOperationContext context, + MutationReplay[] mutations, long replaySeqId) { + try { + context.done(batchReplay(mutations, replaySeqId)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void checkAndMutate(RegionOperationContext context, byte[] row, byte[] family, + byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + Mutation mutation, boolean writeToWAL) { + try { + context.done( + checkAndMutate(row, family, qualifier, compareOp, comparator, mutation, writeToWAL)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void checkAndRowMutate(RegionOperationContext context, byte[] row, byte[] family, + byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + RowMutations mutations, boolean writeToWAL) { + try { + context.done( + checkAndRowMutate(row, family, qualifier, compareOp, comparator, mutations, writeToWAL)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void delete(RegionOperationContext context, Delete delete) { + try { + delete(delete); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void get(RegionOperationContext context, Get get) { + try { + context.done(get(get)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void get(RegionOperationContext> context, Get get, boolean withCoprocessor) { + try { + context.done(get(get, withCoprocessor)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void get(RegionOperationContext> context, Get get, boolean withCoprocessor, + long nonceGroup, long nonce) { + try { + context.done(get(get, withCoprocessor, nonceGroup, nonce)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void getScanner(RegionOperationContext context, Scan scan) { + try { + context.done(getScanner(scan)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void getScanner(RegionOperationContext context, Scan scan, + List additionalScanners) { + try { + context.done(getScanner(scan, additionalScanners)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void getRowLock(RegionOperationContext context, byte[] row, boolean readLock) { + try { + context.done(getRowLock(row, readLock)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void increment(RegionOperationContext context, Increment increment, + long nonceGroup, long nonce) { + try { + context.done(increment(increment, nonceGroup, nonce)); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void mutateRow(RegionOperationContext context, RowMutations mutations) { + try { + mutateRow(mutations); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void mutateRowsWithLocks(RegionOperationContext context, + Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) { + try { + mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void processRowsWithLocks(RegionOperationContext context, + RowProcessor processor) { + try { + processRowsWithLocks(processor); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void processRowsWithLocks(RegionOperationContext context, + RowProcessor processor, long nonceGroup, long nonce) { + try { + processRowsWithLocks(processor, nonceGroup, nonce); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void processRowsWithLocks(RegionOperationContext context, + RowProcessor processor, long timeout, long nonceGroup, long nonce) { + try { + processRowsWithLocks(processor, timeout, nonceGroup, nonce); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + + @Override + public void put(RegionOperationContext context, Put put) { + try { + put(put); + context.done(null); + } catch (IOException e) { + context.error(e); + } + } + public ConcurrentHashMap getLockedRows() { return lockedRows; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index efd68b8..c04ead2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -282,15 +282,16 @@ public interface Region extends ConfigurationObserver { } /** - * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. - * Otherwise, just tries to obtain the lock and returns - * false if unavailable. - * @return the row lock if acquired, - * null if waitForLock was false and the lock was not acquired - * @throws IOException if waitForLock was true and the lock could not be acquired after waiting - */ - RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException; + * + * Get a row lock for the specified row. All locks are reentrant. + * + * Before calling this function make sure that a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + * @param row The row actions will be performed against + * @param readLock is the lock reader or writer. True indicates that a non-exlcusive + * lock is requested + */ + RowLock getRowLock(byte[] row, boolean readLock) throws IOException; /** * If the given list of row locks is not null, releases all locks. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java new file mode 100644 index 0000000..a014bea --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.protobuf.RpcController; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; + +@InterfaceAudience.Private +public class RegionOperationContext { + + private long deadline = Long.MAX_VALUE; + private T result; + private boolean done; + private Exception exception; + + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + public RegionOperationContext() { + } + + public RegionOperationContext(RegionOperationContext context) { + this.deadline = context.deadline; + } + + public RegionOperationContext(RpcController controller) { + if (controller instanceof PayloadCarryingRpcController) { + this.deadline = ((PayloadCarryingRpcController) controller).getDeadline(); + } + } + + public void error(IOException exception) { + this.exception = exception; + done(null); + } + + public void error(TimeoutException exception) { + this.exception = exception; + done(null); + } + + public synchronized void done(T result) { + this.result = result; + this.done = true; + this.notifyAll(); + } + + public synchronized T getResult() throws IOException, TimeoutException { + while (!done) { + long timeout = deadline - System.currentTimeMillis(); + if (timeout <= 0) { + throw new TimeoutException("Region operation timeout"); + } + try { + wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + IOException ioe = new InterruptedIOException(); + ioe.initCause(e); + throw ioe; + } + } + if (exception != null) { + if (exception instanceof IOException) { + throw (IOException)exception; + } + throw (TimeoutException)exception; + } + return result; + } + + public void reset() { + this.result = null; + this.exception = null; + this.done = false; + this.deadline = Long.MAX_VALUE; + } +} -- 2.7.4 (Apple Git-66)