diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 5100314..1bb4416 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -107,9 +107,9 @@ public class ScannerCallable extends RegionServerCallable { * @param connection which connection * @param tableName table callable is on * @param scan the scan to execute - * @param scanMetrics the ScanMetrics to used, if it is null, + * @param scanMetrics the ScanMetrics to used, if it is null, * ScannerCallable won't collect metrics - * @param rpcControllerFactory factory to use when creating + * @param rpcControllerFactory factory to use when creating * {@link com.google.protobuf.RpcController} */ public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan, @@ -194,6 +194,13 @@ public class ScannerCallable extends RegionServerCallable { if (Thread.interrupted()) { throw new InterruptedIOException(); } + + if (controller == null) { + controller = controllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + } + if (closed) { if (scannerId != -1) { close(); @@ -212,9 +219,6 @@ public class ScannerCallable extends RegionServerCallable { RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); try { response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call @@ -374,7 +378,7 @@ public class ScannerCallable extends RegionServerCallable { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { - getStub().scan(null, request); + getStub().scan(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -391,7 +395,7 @@ public class ScannerCallable extends RegionServerCallable { getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = getStub().scan(null, request); + ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 44e8322..9fe2cf6 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -421,7 +421,7 @@ public class AsyncRpcChannel { requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != 0) { + if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { requestHeaderBuilder.setPriority(call.controller.getPriority()); } @@ -669,6 +669,7 @@ public class AsyncRpcChannel { private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, final UserGroupInformation user) throws IOException, InterruptedException { user.doAs(new PrivilegedExceptionAction() { + @Override public Void run() throws IOException, InterruptedException { if (shouldAuthenticateOverKrb()) { if (currRetries < MAX_SASL_RETRIES) { @@ -711,12 +712,12 @@ public class AsyncRpcChannel { public int getConnectionHashCode() { return ConnectionId.hashCode(ticket, serviceName, address); } - + @Override public int hashCode() { return getConnectionHashCode(); } - + @Override public boolean equals(Object obj) { if (obj instanceof AsyncRpcChannel) { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 734227c..ab5bb2a 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; @@ -180,7 +181,7 @@ public class IPCUtil { public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { - return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); + return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock), 0, cellBlock.length); } /** @@ -192,7 +193,7 @@ public class IPCUtil { * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock, final int offset, final int length) + final ByteBuffer cellBlock, final int offset, final int length) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -202,8 +203,7 @@ public class IPCUtil { if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); CompressionInputStream cis = - compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), - poolDecompressor); + compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); ByteBufferOutputStream bbos = null; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. @@ -221,7 +221,7 @@ public class IPCUtil { CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + is = new ByteBufferInputStream(cellBlock); } return codec.getDecoder(is); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 70f30f9..82634e5 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -35,14 +35,14 @@ import org.apache.hadoop.hbase.TableName; @InterfaceAudience.Private public class PayloadCarryingRpcController extends TimeLimitedRpcController implements CellScannable { + + public static final int PRIORITY_UNSET = -1; /** * Priority to set on this request. Set it here in controller so available composing the * request. This is the ordained way of setting priorities going forward. We will be * undoing the old annotation-based mechanism. */ - // Currently only multi call makes use of this. Eventually this should be only way to set - // priority. - private int priority = HConstants.NORMAL_QOS; + private int priority = PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -67,6 +67,7 @@ public class PayloadCarryingRpcController /** * @return One-shot cell scanner (you cannot back it up and restart) */ + @Override public CellScanner cellScanner() { return cellScanner; } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 1509f54..4321d47 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -895,8 +895,10 @@ public class RpcClientImpl extends AbstractRpcClient { cellBlockBuilder.setLength(cellBlock.limit()); builder.setCellBlockMeta(cellBlockBuilder.build()); } - // Only pass priority if there one. Let zero be same as no priority. - if (priority != 0) builder.setPriority(priority); + // Only pass priority if there is one set. + if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { + builder.setPriority(priority); + } RequestHeader header = builder.build(); setupIOstreams(); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 5f5c3a3..a7f27ae 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.LimitInputStream; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -2459,13 +2458,13 @@ public final class ProtobufUtil { */ public static String getRegionEncodedName( final RegionSpecifier regionSpecifier) throws DoNotRetryIOException { - byte[] value = regionSpecifier.getValue().toByteArray(); + ByteString value = regionSpecifier.getValue(); RegionSpecifierType type = regionSpecifier.getType(); switch (type) { case REGION_NAME: - return HRegionInfo.encodeRegionName(value); + return HRegionInfo.encodeRegionName(value.toByteArray()); case ENCODED_REGION_NAME: - return Bytes.toString(value); + return value.toStringUtf8(); default: throw new DoNotRetryIOException( "Unsupported region specifier type: " + type); @@ -2889,7 +2888,7 @@ public final class ProtobufUtil { } return result; } - + /** * Convert a protocol buffer TimeUnit to a client TimeUnit * @param proto @@ -3121,7 +3120,7 @@ public final class ProtobufUtil { * @param builder current message builder * @param in InputStream containing protobuf data * @param size known size of protobuf data - * @throws IOException + * @throws IOException */ public static void mergeFrom(Message.Builder builder, InputStream in, int size) throws IOException { @@ -3136,7 +3135,7 @@ public final class ProtobufUtil { * buffers where the message size is not known * @param builder current message builder * @param in InputStream containing protobuf data - * @throws IOException + * @throws IOException */ public static void mergeFrom(Message.Builder builder, InputStream in) throws IOException { @@ -3150,8 +3149,8 @@ public final class ProtobufUtil { * This version of protobuf's mergeFrom avoids the hard-coded 64MB limit for decoding * buffers when working with ByteStrings * @param builder current message builder - * @param bs ByteString containing the - * @throws IOException + * @param bs ByteString containing the + * @throws IOException */ public static void mergeFrom(Message.Builder builder, ByteString bs) throws IOException { final CodedInputStream codedInput = bs.newCodedInput(); @@ -3165,7 +3164,7 @@ public final class ProtobufUtil { * buffers when working with byte arrays * @param builder current message builder * @param b byte array - * @throws IOException + * @throws IOException */ public static void mergeFrom(Message.Builder builder, byte[] b) throws IOException { final CodedInputStream codedInput = CodedInputStream.newInstance(b); @@ -3191,6 +3190,19 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } + public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length) + throws IOException { + codedInput.resetSizeCounter(); + int prevLimit = codedInput.setSizeLimit(length); + + int limit = codedInput.pushLimit(length); + builder.mergeFrom(codedInput); + codedInput.popLimit(limit); + + codedInput.checkLastTagWas(0); + codedInput.setSizeLimit(prevLimit); + } + public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink cls) { return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp()); diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 5b30482..63230ec 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -47,7 +47,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) +@Category(SmallTests.class) public class TestIPCUtil { private static final Log LOG = LogFactory.getLog(TestIPCUtil.class); @@ -57,7 +57,7 @@ public class TestIPCUtil { public void before() { this.util = new IPCUtil(new Configuration()); } - + @Test public void testBuildCellBlock() throws IOException { doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); @@ -78,7 +78,7 @@ public class TestIPCUtil { CellScanner cellScanner = sized? getSizedCellScanner(cells): CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + cellScanner = util.createCellScanner(codec, compressor, bb, 0, bb.limit()); int i = 0; while (cellScanner.advance()) { i++; diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 5e76fff..431087a 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -43,7 +43,7 @@ public class Threads { private static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); - private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + public static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 0011c2e..43d1120 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -128,6 +129,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import org.apache.htrace.TraceInfo; +import org.apache.zookeeper.server.ByteBufferInputStream; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; @@ -534,10 +536,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + @Override public long getResponseCellSize() { return responseCellSize; } + @Override public void incrementResponseCellSize(long cellSize) { responseCellSize += cellSize; } @@ -628,7 +632,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + - ",port=" + port).setDaemon(true).build()); + ",port=" + port).setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); for (int i = 0; i < readThreads; ++i) { Reader reader = new Reader(); readers[i] = reader; @@ -902,7 +907,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw ieo; } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage()); + LOG.debug(getName() + ": Caught exception while reading", e); } count = -1; //so that the (count < 0) block is executed } @@ -948,6 +953,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { Responder() throws IOException { this.setName("RpcServer.responder"); this.setDaemon(true); + this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); writeSelector = Selector.open(); // create a selector } @@ -1373,17 +1379,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return authorizedUgi; } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.limit() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] b = saslToken.array(); + byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); processUnwrappedData(plaintextData); } } else { @@ -1432,10 +1439,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + replyToken = saslServer.evaluateResponse(saslToken.array()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1631,6 +1638,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } + + // TODO: check dataLength against some limit so that the client cannot OOM the server data = ByteBuffer.allocate(dataLength); // Increment the rpc count. This counter will be decreased when we write @@ -1660,9 +1669,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { @@ -1691,8 +1700,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + this.connectionHeader = ConnectionHeader.parseFrom(new ByteBufferInputStream(buf)); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1806,13 +1815,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData.array()); + processOneRpc(unwrappedData); unwrappedData = null; } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1834,16 +1843,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1874,19 +1883,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); - // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, cis, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { + buf.position(offset); // CodedInputStream may have consumed more than it should have cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf, offset, buf.limit()); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); @@ -2025,7 +2033,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { final InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - + if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) { this.reservoir = new BoundedByteBufferPool( conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024), @@ -2033,7 +2041,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Make the max twice the number of handlers to be safe. conf.getInt("hbase.ipc.server.reservoir.initial.max", conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2), + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2), // By default make direct byte buffers from the buffer pool. conf.getBoolean("hbase.ipc.server.reservoir.direct.buffer", true)); } else { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index cfdbce0..8438378 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; @@ -217,10 +216,10 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction { if (param == null) { return HConstants.NORMAL_QOS; } - if (param instanceof MultiRequest) { - // The multi call has its priority set in the header. All calls should work this way but - // only this one has been converted so far. No priority == NORMAL_QOS. - return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; + + // Trust the client-set priorities if set + if (header.hasPriority()) { + return header.getPriority(); } String cls = param.getClass().getName(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 93a1c17..41c8d17 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5162,6 +5162,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param readLock is the lock reader or writer. True indicates that a non-exlcusive * lock is requested */ + @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); @@ -5569,8 +5570,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). - List scanners = new ArrayList(); - List joinedScanners = new ArrayList(); + List scanners = new ArrayList(scan.getFamilyMap().size()); + List joinedScanners + = new ArrayList(scan.getFamilyMap().size()); if (additionalScanners != null) { scanners.addAll(additionalScanners); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f136071..f95446f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1050,8 +1050,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ Region getRegion( final RegionSpecifier regionSpecifier) throws IOException { - return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), - ProtobufUtil.getRegionEncodedName(regionSpecifier)); + ByteString value = regionSpecifier.getValue(); + RegionSpecifierType type = regionSpecifier.getType(); + switch (type) { + case REGION_NAME: + byte[] regionName = value.toByteArray(); + String encodedRegionName = HRegionInfo.encodeRegionName(regionName); + return regionServer.getRegionByEncodedName(regionName, encodedRegionName); + case ENCODED_REGION_NAME: + return regionServer.getRegionByEncodedName(value.toStringUtf8()); + default: + throw new DoNotRetryIOException( + "Unsupported region specifier type: " + type); + } } @VisibleForTesting