commit a134a35754d80c22891297482eeaecbc782d2d72 Author: Enis Soztutar Date: Tue Feb 2 11:57:44 2016 -0800 HBASE-15177 - v2 diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index f6445a6..72d69ec 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -191,6 +191,13 @@ public class ScannerCallable extends RegionServerCallable { if (Thread.interrupted()) { throw new InterruptedIOException(); } + + if (controller == null) { + controller = controllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + } + if (closed) { if (scannerId != -1) { close(); @@ -209,9 +216,6 @@ public class ScannerCallable extends RegionServerCallable { RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); try { response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call @@ -371,7 +375,7 @@ public class ScannerCallable extends RegionServerCallable { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { - getStub().scan(null, request); + getStub().scan(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -388,7 +392,7 @@ public class ScannerCallable extends RegionServerCallable { getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = getStub().scan(null, request); + ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 69978fc..787aa47 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -412,7 +412,7 @@ public class AsyncRpcChannel { requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != 0) { + if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { requestHeaderBuilder.setPriority(call.controller.getPriority()); } @@ -660,6 +660,7 @@ public class AsyncRpcChannel { private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, final UserGroupInformation user) throws IOException, InterruptedException { user.doAs(new PrivilegedExceptionAction() { + @Override public Void run() throws IOException, InterruptedException { if (shouldAuthenticateOverKrb()) { if (currRetries < MAX_SASL_RETRIES) { @@ -702,12 +703,12 @@ public class AsyncRpcChannel { public int getConnectionHashCode() { return ConnectionId.hashCode(ticket, serviceName, address); } - + @Override public int hashCode() { return getConnectionHashCode(); } - + @Override public boolean equals(Object obj) { if (obj instanceof AsyncRpcChannel) { diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 734227c..ab5bb2a 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; @@ -180,7 +181,7 @@ public class IPCUtil { public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { - return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); + return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock), 0, cellBlock.length); } /** @@ -192,7 +193,7 @@ public class IPCUtil { * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock, final int offset, final int length) + final ByteBuffer cellBlock, final int offset, final int length) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -202,8 +203,7 @@ public class IPCUtil { if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); CompressionInputStream cis = - compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), - poolDecompressor); + compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); ByteBufferOutputStream bbos = null; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. @@ -221,7 +221,7 @@ public class IPCUtil { CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + is = new ByteBufferInputStream(cellBlock); } return codec.getDecoder(is); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 09f4323..f4f18b3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -35,14 +35,14 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class PayloadCarryingRpcController extends TimeLimitedRpcController implements CellScannable { + + public static final int PRIORITY_UNSET = -1; /** * Priority to set on this request. Set it here in controller so available composing the * request. This is the ordained way of setting priorities going forward. We will be * undoing the old annotation-based mechanism. */ - // Currently only multi call makes use of this. Eventually this should be only way to set - // priority. - private int priority = HConstants.NORMAL_QOS; + private int priority = PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -67,6 +67,7 @@ public class PayloadCarryingRpcController /** * @return One-shot cell scanner (you cannot back it up and restart) */ + @Override public CellScanner cellScanner() { return cellScanner; } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 940fcd1..83d4adf 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -899,8 +899,10 @@ public class RpcClientImpl extends AbstractRpcClient { cellBlockBuilder.setLength(cellBlock.limit()); builder.setCellBlockMeta(cellBlockBuilder.build()); } - // Only pass priority if there one. Let zero be same as no priority. - if (priority != 0) builder.setPriority(priority); + // Only pass priority if there is one set. + if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { + builder.setPriority(priority); + } RequestHeader header = builder.build(); setupIOstreams(); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 7cd0d91..fe76780 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2430,13 +2430,13 @@ public final class ProtobufUtil { */ public static String getRegionEncodedName( final RegionSpecifier regionSpecifier) throws DoNotRetryIOException { - byte[] value = regionSpecifier.getValue().toByteArray(); + ByteString value = regionSpecifier.getValue(); RegionSpecifierType type = regionSpecifier.getType(); switch (type) { case REGION_NAME: - return HRegionInfo.encodeRegionName(value); + return HRegionInfo.encodeRegionName(value.toByteArray()); case ENCODED_REGION_NAME: - return Bytes.toString(value); + return value.toStringUtf8(); default: throw new DoNotRetryIOException( "Unsupported region specifier type: " + type); @@ -3135,6 +3135,19 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } + public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length) + throws IOException { + codedInput.resetSizeCounter(); + int prevLimit = codedInput.setSizeLimit(length); + + int limit = codedInput.pushLimit(length); + builder.mergeFrom(codedInput); + codedInput.popLimit(limit); + + codedInput.checkLastTagWas(0); + codedInput.setSizeLimit(prevLimit); + } + public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink cls) { return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp()); diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 163be70..743f519 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -58,7 +58,7 @@ public class TestIPCUtil { public void before() { this.util = new IPCUtil(new Configuration()); } - + @Test public void testBuildCellBlock() throws IOException { doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); @@ -79,7 +79,7 @@ public class TestIPCUtil { CellScanner cellScanner = sized? getSizedCellScanner(cells): CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + cellScanner = util.createCellScanner(codec, compressor, bb, 0, bb.limit()); int i = 0; while (cellScanner.advance()) { i++; diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java new file mode 100644 index 0000000..23b3abb --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Not thread safe! + *

+ * Please note that the reads will cause position movement on wrapped ByteBuffer. + */ +@InterfaceAudience.Private +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer buf; + + public ByteBufferInputStream(ByteBuffer buf) { + this.buf = buf; + } + + /** + * Reads the next byte of data from this input stream. The value byte is returned as an + * int in the range 0 to 255. If no byte is available + * because the end of the stream has been reached, the value -1 is returned. + * @return the next byte of data, or -1 if the end of the stream has been reached. + */ + @Override + public int read() { + if (this.buf.hasRemaining()) { + return (this.buf.get() & 0xff); + } + return -1; + } + + /** + * Reads up to next len bytes of data from buffer into passed array(starting from + * given offset). + * @param b the array into which the data is read. + * @param off the start offset in the destination array b + * @param len the maximum number of bytes to read. + * @return the total number of bytes actually read into the buffer, or -1 if not even + * 1 byte can be read because the end of the stream has been reached. + */ + @Override + public int read(byte b[], int off, int len) { + int avail = available(); + if (avail <= 0) { + return -1; + } + + if (len > avail) { + len = avail; + } + if (len <= 0) { + return 0; + } + + this.buf.get(b, off, len); + return len; + } + + /** + * Skips n bytes of input from this input stream. Fewer bytes might be skipped if the + * end of the input stream is reached. The actual number k of bytes to be skipped is + * equal to the smaller of n and remaining bytes in the stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + @Override + public long skip(long n) { + long k = Math.min(n, available()); + if (k < 0) { + k = 0; + } + this.buf.position((int) (this.buf.position() + k)); + return k; + } + + /** + * @return the number of remaining bytes that can be read (or skipped + * over) from this input stream. + */ + @Override + public int available() { + return this.buf.remaining(); + } +} \ No newline at end of file diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index c366762..30585af 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -45,7 +45,7 @@ public class Threads { private static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); - private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + public static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a9c64a3..67a5918 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -110,6 +111,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -494,10 +496,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + @Override public long getResponseCellSize() { return responseCellSize; } + @Override public void incrementResponseCellSize(long cellSize) { responseCellSize += cellSize; } @@ -586,7 +590,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + - ",port=" + port).setDaemon(true).build()); + ",port=" + port).setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); for (int i = 0; i < readThreads; ++i) { Reader reader = new Reader(); readers[i] = reader; @@ -863,7 +868,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw ieo; } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage()); + LOG.debug(getName() + ": Caught exception while reading:", e); } count = -1; //so that the (count < 0) block is executed } @@ -909,6 +914,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { Responder() throws IOException { this.setName("RpcServer.responder"); this.setDaemon(true); + this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); writeSelector = Selector.open(); // create a selector } @@ -1326,17 +1332,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return authorizedUgi; } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.limit() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] b = saslToken.array(); + byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); processUnwrappedData(plaintextData); } } else { @@ -1385,10 +1392,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + replyToken = saslServer.evaluateResponse(saslToken.array()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1584,6 +1591,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } + + // TODO: check dataLength against some limit so that the client cannot OOM the server data = ByteBuffer.allocate(dataLength); // Increment the rpc count. This counter will be decreased when we write @@ -1613,9 +1622,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { @@ -1644,8 +1653,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + this.connectionHeader = ConnectionHeader.parseFrom( + new ByteBufferInputStream(buf)); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1759,13 +1769,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData.array()); + processOneRpc(unwrappedData); unwrappedData = null; } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1787,16 +1798,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1827,19 +1838,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); - // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, cis, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf, offset, buf.limit()); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index cfdbce0..8438378 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; @@ -217,10 +216,10 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction { if (param == null) { return HConstants.NORMAL_QOS; } - if (param instanceof MultiRequest) { - // The multi call has its priority set in the header. All calls should work this way but - // only this one has been converted so far. No priority == NORMAL_QOS. - return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; + + // Trust the client-set priorities if set + if (header.hasPriority()) { + return header.getPriority(); } String cls = param.getClass().getName(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c93123c..90f3e38 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5177,6 +5177,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param readLock is the lock reader or writer. True indicates that a non-exlcusive * lock is requested */ + @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); @@ -5592,8 +5593,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). - List scanners = new ArrayList(); - List joinedScanners = new ArrayList(); + List scanners = new ArrayList(scan.getFamilyMap().size()); + List joinedScanners + = new ArrayList(scan.getFamilyMap().size()); if (additionalScanners != null) { scanners.addAll(additionalScanners); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7eaadc2..3e133c4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1147,8 +1147,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ Region getRegion( final RegionSpecifier regionSpecifier) throws IOException { - return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), - ProtobufUtil.getRegionEncodedName(regionSpecifier)); + ByteString value = regionSpecifier.getValue(); + RegionSpecifierType type = regionSpecifier.getType(); + switch (type) { + case REGION_NAME: + byte[] regionName = value.toByteArray(); + String encodedRegionName = HRegionInfo.encodeRegionName(regionName); + return regionServer.getRegionByEncodedName(regionName, encodedRegionName); + case ENCODED_REGION_NAME: + return regionServer.getRegionByEncodedName(value.toStringUtf8()); + default: + throw new DoNotRetryIOException( + "Unsupported region specifier type: " + type); + } } @VisibleForTesting