From 958602c77543605448ff7194466c26994d61af20 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Wed, 13 Apr 2016 19:43:14 +0800 Subject: [PATCH] HBASE-15593 Time limit of scanning should be offered by client --- .../hadoop/hbase/client/RpcRetryingCaller.java | 2 +- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 3 +- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 1 + .../hadoop/hbase/protobuf/generated/RPCProtos.java | 105 +++++++++++++++++++-- hbase-protocol/src/main/protobuf/RPC.proto | 1 + .../org/apache/hadoop/hbase/ipc/CallRunner.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 42 +++++++-- .../hadoop/hbase/ipc/RpcServerInterface.java | 5 + .../hadoop/hbase/regionserver/RSRpcServices.java | 9 ++ .../regionserver/TestScannerHeartbeatMessages.java | 19 ++-- 10 files changed, 162 insertions(+), 27 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 896222c..b20c6ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -57,7 +57,7 @@ public class RpcRetryingCaller { /** * Start and end times for a single call. */ - private final static int MIN_RPC_TIMEOUT = 2000; + private final static int MIN_RPC_TIMEOUT = 1; /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 5906708..e86cac6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -423,7 +423,8 @@ public class AsyncRpcChannel { if (call.controller.getPriority() != 0) { requestHeaderBuilder.setPriority(call.controller.getPriority()); } - + requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int)call.rpcTimeout); RPCProtos.RequestHeader rh = requestHeaderBuilder.build(); int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 915b2b5..595dfed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -892,6 +892,7 @@ public class RpcClientImpl extends AbstractRpcClient { } // Only pass priority if there one. Let zero be same as no priority. if (priority != 0) builder.setPriority(priority); + builder.setTimeout(call.timeout); RequestHeader header = builder.build(); setupIOstreams(); diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java index 7758e98..0d358b5 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java @@ -5285,6 +5285,16 @@ public final class RPCProtos { * */ int getPriority(); + + // optional uint32 timeout = 7; + /** + * optional uint32 timeout = 7; + */ + boolean hasTimeout(); + /** + * optional uint32 timeout = 7; + */ + int getTimeout(); } /** * Protobuf type {@code RequestHeader} @@ -5387,6 +5397,11 @@ public final class RPCProtos { priority_ = input.readUInt32(); break; } + case 56: { + bitField0_ |= 0x00000040; + timeout_ = input.readUInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -5600,6 +5615,22 @@ public final class RPCProtos { return priority_; } + // optional uint32 timeout = 7; + public static final int TIMEOUT_FIELD_NUMBER = 7; + private int timeout_; + /** + * optional uint32 timeout = 7; + */ + public boolean hasTimeout() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional uint32 timeout = 7; + */ + public int getTimeout() { + return timeout_; + } + private void initFields() { callId_ = 0; traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance(); @@ -5607,6 +5638,7 @@ public final class RPCProtos { requestParam_ = false; cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance(); priority_ = 0; + timeout_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5638,6 +5670,9 @@ public final class RPCProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(6, priority_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeUInt32(7, timeout_); + } getUnknownFields().writeTo(output); } @@ -5671,6 +5706,10 @@ public final class RPCProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(6, priority_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(7, timeout_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5724,6 +5763,11 @@ public final class RPCProtos { result = result && (getPriority() == other.getPriority()); } + result = result && (hasTimeout() == other.hasTimeout()); + if (hasTimeout()) { + result = result && (getTimeout() + == other.getTimeout()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5761,6 +5805,10 @@ public final class RPCProtos { hash = (37 * hash) + PRIORITY_FIELD_NUMBER; hash = (53 * hash) + getPriority(); } + if (hasTimeout()) { + hash = (37 * hash) + TIMEOUT_FIELD_NUMBER; + hash = (53 * hash) + getTimeout(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -5896,6 +5944,8 @@ public final class RPCProtos { bitField0_ = (bitField0_ & ~0x00000010); priority_ = 0; bitField0_ = (bitField0_ & ~0x00000020); + timeout_ = 0; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -5956,6 +6006,10 @@ public final class RPCProtos { to_bitField0_ |= 0x00000020; } result.priority_ = priority_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.timeout_ = timeout_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5992,6 +6046,9 @@ public final class RPCProtos { if (other.hasPriority()) { setPriority(other.getPriority()); } + if (other.hasTimeout()) { + setTimeout(other.getTimeout()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6514,6 +6571,39 @@ public final class RPCProtos { return this; } + // optional uint32 timeout = 7; + private int timeout_ ; + /** + * optional uint32 timeout = 7; + */ + public boolean hasTimeout() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional uint32 timeout = 7; + */ + public int getTimeout() { + return timeout_; + } + /** + * optional uint32 timeout = 7; + */ + public Builder setTimeout(int value) { + bitField0_ |= 0x00000040; + timeout_ = value; + onChanged(); + return this; + } + /** + * optional uint32 timeout = 7; + */ + public Builder clearTimeout() { + bitField0_ = (bitField0_ & ~0x00000040); + timeout_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:RequestHeader) } @@ -7538,16 +7628,17 @@ public final class RPCProtos { "\037\n\rCellBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exce" + "ptionResponse\022\034\n\024exception_class_name\030\001 " + "\001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 \001" + - "(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\246" + + "(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 \001(\010\"\267" + "\001\n\rRequestHeader\022\017\n\007call_id\030\001 \001(\r\022\035\n\ntra" + "ce_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013method_name\030" + "\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022\'\n\017cell_blo" + "ck_meta\030\005 \001(\0132\016.CellBlockMeta\022\020\n\010priorit" + - "y\030\006 \001(\r\"q\n\016ResponseHeader\022\017\n\007call_id\030\001 \001" + - "(\r\022%\n\texception\030\002 \001(\0132\022.ExceptionRespons", - "e\022\'\n\017cell_block_meta\030\003 \001(\0132\016.CellBlockMe" + - "taB<\n*org.apache.hadoop.hbase.protobuf.g" + - "eneratedB\tRPCProtosH\001\240\001\001" + "y\030\006 \001(\r\022\017\n\007timeout\030\007 \001(\r\"q\n\016ResponseHead" + + "er\022\017\n\007call_id\030\001 \001(\r\022%\n\texception\030\002 \001(\0132\022", + ".ExceptionResponse\022\'\n\017cell_block_meta\030\003 " + + "\001(\0132\016.CellBlockMetaB<\n*org.apache.hadoop" + + ".hbase.protobuf.generatedB\tRPCProtosH\001\240\001" + + "\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7589,7 +7680,7 @@ public final class RPCProtos { internal_static_RequestHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RequestHeader_descriptor, - new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", }); + new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "Priority", "Timeout", }); internal_static_ResponseHeader_descriptor = getDescriptor().getMessageTypes().get(6); internal_static_ResponseHeader_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto index a5d60d8..b04249e 100644 --- a/hbase-protocol/src/main/protobuf/RPC.proto +++ b/hbase-protocol/src/main/protobuf/RPC.proto @@ -133,6 +133,7 @@ message RequestHeader { // 0 is NORMAL priority. 200 is HIGH. If no priority, treat it as NORMAL. // See HConstants. optional uint32 priority = 6; + optional uint32 timeout = 7; } message ResponseHeader { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index a0015d2..53fca53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -105,7 +105,7 @@ public class CallRunner { } // make the call resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status); + call.timestamp, this.status, call.timeout); } catch (Throwable e) { RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e); errorThrowable = e; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 19ecd57..d2b7ed2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -257,6 +257,13 @@ public class RpcServer implements RpcServerInterface { private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; + /** + * Minimum allowable timeout (in milliseconds) in rpc request's header. This + * configuration exists to prevent the rpc service regarding this request as timeout immediately. + */ + private static final String MIN_CLIENT_REQUEST_TIMEOUT = "hbase.ipc.min.client.request.timeout"; + private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20; + /** Default value for above params */ private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; @@ -265,6 +272,9 @@ public class RpcServer implements RpcServerInterface { private final int warnResponseTime; private final int warnResponseSize; + + private final int minClientRequestTimeout; + private final Server server; private final List services; @@ -292,6 +302,7 @@ public class RpcServer implements RpcServerInterface { protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null + protected int timeout; /** * Chain of buffers to send as response. */ @@ -310,7 +321,7 @@ public class RpcServer implements RpcServerInterface { Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, - long size, TraceInfo tinfo, final InetAddress remoteAddress) { + long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { this.id = id; this.service = service; this.md = md; @@ -327,6 +338,7 @@ public class RpcServer implements RpcServerInterface { this.tinfo = tinfo; this.user = connection.user == null? null: userProvider.create(connection.user); this.remoteAddress = remoteAddress; + this.timeout = timeout; } /** @@ -1236,13 +1248,13 @@ public class RpcServer implements RpcServerInterface { private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null, - null); + null, 0); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = - new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null); + new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null, 0); public UserGroupInformation attemptingUser = null; // user name before auth @@ -1638,7 +1650,7 @@ public class RpcServer implements RpcServerInterface { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1799,7 +1811,7 @@ public class RpcServer implements RpcServerInterface { if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null); + responder, totalRequestSize, null, null, 0); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); InetSocketAddress address = getListenerAddress(); @@ -1850,7 +1862,7 @@ public class RpcServer implements RpcServerInterface { final Call readParamsFailedCall = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null); + responder, totalRequestSize, null, null, 0); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -1861,8 +1873,12 @@ public class RpcServer implements RpcServerInterface { TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId()) : null; + int timeout = 0; + if (header.hasTimeout()){ + timeout = Math.max(minClientRequestTimeout, header.getTimeout()); + } Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, - totalRequestSize, traceInfo, RpcServer.getRemoteIp()); + totalRequestSize, traceInfo, RpcServer.getRemoteIp(), timeout); scheduler.dispatch(new CallRunner(RpcServer.this, call)); } @@ -1988,6 +2004,8 @@ public class RpcServer implements RpcServerInterface { this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, + + DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); @@ -2107,6 +2125,12 @@ public class RpcServer implements RpcServerInterface { this.secretManager = (SecretManager) secretManager; } + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + throws IOException { + return call(service, md, param, cellScanner, receiveTime, status, 0); + } + /** * This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the @@ -2114,7 +2138,8 @@ public class RpcServer implements RpcServerInterface { */ @Override public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, + int timeout) throws IOException { try { status.setRPC(md.getName(), new Object[]{param}, receiveTime); @@ -2124,6 +2149,7 @@ public class RpcServer implements RpcServerInterface { //get an instance of the method arg type long startTime = System.currentTimeMillis(); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); + controller.setCallTimeout(timeout); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 013d256..12b158d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -52,6 +52,11 @@ public interface RpcServerInterface { Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException, ServiceException; + Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, + int timeout) + throws IOException, ServiceException; + void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 318005b..c561769 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -2415,6 +2416,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } + if (controller instanceof TimeLimitedRpcController) { + TimeLimitedRpcController timeLimitedRpcController = + (TimeLimitedRpcController)controller; + if (timeLimitedRpcController.getCallTimeout() > 0) { + timeLimitDelta = Math.min(timeLimitDelta, + timeLimitedRpcController.getCallTimeout()); + } + } // Use half of whichever timeout value was more restrictive... But don't allow // the time limit to be less than the allowable minimum (could cause an // immediatate timeout before scanning any data). diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index fbb71d6..3e832f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -105,16 +106,16 @@ public class TestScannerHeartbeatMessages { private static int VALUE_SIZE = 128; private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int SERVER_TIMEOUT = 2000; + // Time, in milliseconds, that the client will wait for a response from the server before timing // out. This value is used server side to determine when it is necessary to send a heartbeat // message to the client - private static int CLIENT_TIMEOUT = 2000; - - // The server limits itself to running for half of the CLIENT_TIMEOUT value. - private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2; + private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3; // By default, at most one row's worth of cells will be retrieved before the time limit is reached - private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2; + private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5; // By default, at most cells for two column families are retrieved before the time limit is // reached private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / NUM_FAMILIES; @@ -127,8 +128,8 @@ public class TestScannerHeartbeatMessages { conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // Check the timeout condition after every cell @@ -143,7 +144,7 @@ public class TestScannerHeartbeatMessages { Table ht = TEST_UTIL.createTable(name, families); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - + ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); return ht; } @@ -285,7 +286,7 @@ public class TestScannerHeartbeatMessages { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { try { - Thread.sleep(SERVER_TIME_LIMIT + 10); + Thread.sleep(CLIENT_TIMEOUT/2 + 10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } -- 2.6.4 (Apple Git-63)