From ac7d8eb0bfc61fd34cec631f4f8fab1872905c64 Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Sat, 7 May 2016 14:53:59 +0200 Subject: [PATCH] HBASE-15795 --- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 14 +- .../hbase/ipc/AsyncServerResponseHandler.java | 14 +- .../hadoop/hbase/ipc/BlockingRpcCallback.java | 4 +- .../java/org/apache/hadoop/hbase/ipc/Call.java | 5 +- .../hadoop/hbase/ipc/CallTimeoutException.java | 4 +- .../org/apache/hadoop/hbase/ipc/ConnectionId.java | 14 +- .../hadoop/hbase/ipc/CoprocessorRpcUtils.java | 2 +- .../DelegatingPayloadCarryingRpcController.java | 2 +- .../org/apache/hadoop/hbase/ipc/FailedServers.java | 13 +- .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java | 155 +++++++++------------ .../hbase/ipc/MasterCoprocessorRpcChannel.java | 9 +- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 17 ++- .../ipc/RegionServerCoprocessorRpcChannel.java | 17 +-- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 7 +- .../apache/hadoop/hbase/ipc/RpcClientFactory.java | 7 +- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 135 +++++++++++------- .../hadoop/hbase/ipc/ServerRpcController.java | 8 +- .../hadoop/hbase/ipc/TimeLimitedRpcController.java | 10 +- 18 files changed, 224 insertions(+), 213 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index ec6332a..c091d1d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -141,7 +141,9 @@ public abstract class AbstractRpcClient implements RpcClient { // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); - if (className == null || className.length() == 0) return null; + if (className == null || className.length() == 0) { + return null; + } try { return (Codec)Class.forName(className).newInstance(); } catch (Exception e) { @@ -161,9 +163,11 @@ public abstract class AbstractRpcClient implements RpcClient { */ private static CompressionCodec getCompressor(final Configuration conf) { String className = conf.get("hbase.client.rpc.compressor", null); - if (className == null || className.isEmpty()) return null; + if (className == null || className.isEmpty()) { + return null; + } try { - return (CompressionCodec)Class.forName(className).newInstance(); + return (CompressionCodec)Class.forName(className).newInstance(); } catch (Exception e) { throw new RuntimeException("Failed getting compressor " + className, e); } @@ -252,8 +256,8 @@ public abstract class AbstractRpcClient implements RpcClient { * will be a * new Connection each time. * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException - * @throws java.io.IOException + * @throws InterruptedException if call is interrupted + * @throws java.io.IOException if transport failed */ protected abstract Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index a0928b1..6fcca34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.Message; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + import java.io.IOException; import org.apache.hadoop.hbase.CellScanner; @@ -25,13 +32,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.ipc.RemoteException; -import com.google.protobuf.Message; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; - /** * Handles Hbase responses */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java index 3aa59c7..0475e58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java @@ -18,13 +18,13 @@ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.RpcCallback; + import java.io.IOException; import java.io.InterruptedIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.protobuf.RpcCallback; - /** * Simple {@link RpcCallback} implementation providing a * {@link java.util.concurrent.Future}-like {@link BlockingRpcCallback#get()} method, which diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 5f90837..73bc0e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,14 +19,15 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; + +import java.io.IOException; + import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import java.io.IOException; - /** A call waiting for a value. */ @InterfaceAudience.Private public class Call { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java index a81e5d1..1e31f72 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java @@ -17,11 +17,11 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import java.io.IOException; - /** * Client-side call timeout */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java index 33fc880..08f8171 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java @@ -17,11 +17,11 @@ */ package org.apache.hadoop.hbase.ipc; +import java.net.InetSocketAddress; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.security.User; -import java.net.InetSocketAddress; - /** * This class holds the address and the user ticket, etc. The client connections * to servers are uniquely identified by <remoteAddress, ticket, serviceName> @@ -58,14 +58,14 @@ public class ConnectionId { @Override public boolean equals(Object obj) { - if (obj instanceof ConnectionId) { - ConnectionId id = (ConnectionId) obj; - return address.equals(id.address) && + if (obj instanceof ConnectionId) { + ConnectionId id = (ConnectionId) obj; + return address.equals(id.address) && ((ticket != null && ticket.equals(id.ticket)) || (ticket == id.ticket)) && this.serviceName == id.serviceName; - } - return false; + } + return false; } @Override // simply use the default Object#hashcode() ? diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcUtils.java index 60044d4..63ff3e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CoprocessorRpcUtils.java @@ -34,7 +34,7 @@ public final class CoprocessorRpcUtils { * We assume that all HBase protobuf services share a common package name * (defined in the .proto files). */ - private static String hbaseServicePackage; + private static final String hbaseServicePackage; static { Descriptors.ServiceDescriptor clientService = ClientProtos.ClientService.getDescriptor(); hbaseServicePackage = clientService.getFullName() diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java index ad4224b..aafd492 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController { - private PayloadCarryingRpcController delegate; + private final PayloadCarryingRpcController delegate; public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) { this.delegate = delegate; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java index 16ec16c..868cdc6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java @@ -17,22 +17,21 @@ */ package org.apache.hadoop.hbase.ipc; +import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.LinkedList; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.LinkedList; - /** * A class to manage a list of servers that failed recently. */ @InterfaceAudience.Private public class FailedServers { - private final LinkedList> failedServers = new - LinkedList>(); + private final LinkedList> failedServers = new LinkedList<>(); private final int recheckServersTimeout; public FailedServers(Configuration conf) { @@ -45,7 +44,7 @@ public class FailedServers { */ public synchronized void addToFailedServers(InetSocketAddress address) { final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout; - failedServers.addFirst(new Pair(expiry, address.toString())); + failedServers.addFirst(new Pair<>(expiry, address.toString())); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index d98d81d..a87bc8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,21 +17,22 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.DataInput; +import com.google.common.base.Preconditions; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Message; import java.io.IOException; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; - import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferInputStream; @@ -45,10 +46,6 @@ import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; -import com.google.common.base.Preconditions; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.Message; - /** * Utility to help ipc'ing. */ @@ -83,13 +80,13 @@ public class IPCUtil { /** * Puts CellScanner Cells into a cell block using passed in codec and/or * compressor. - * @param codec - * @param compressor - * @param cellScanner + * @param codec to use for encoding + * @param compressor to use for encoding + * @param cellScanner to encode * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in codec and/or compressor; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. - * @throws IOException + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. + * @throws IOException if encoding the cells fail */ @SuppressWarnings("resource") public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, @@ -101,26 +98,30 @@ public class IPCUtil { /** * Puts CellScanner Cells into a cell block using passed in codec and/or * compressor. - * @param codec - * @param compressor - * @param cellScanner + * @param codec to use for encoding + * @param compressor to use for encoding + * @param cellScanner to encode * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate - * our own ByteBuffer. + * our own ByteBuffer. * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in codec and/or compressor; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. If pool was not - * null, then this returned ByteBuffer came from there and should be returned to the pool when - * done. - * @throws IOException + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. If pool was not + * null, then this returned ByteBuffer came from there and should be returned to the pool when + * done. + * @throws IOException if encoding the cells fail */ @SuppressWarnings("resource") public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner, final BoundedByteBufferPool pool) throws IOException { - if (cellScanner == null) return null; - if (codec == null) throw new CellScannerButNoCodecException(); + if (cellScanner == null) { + return null; + } + if (codec == null) { + throw new CellScannerButNoCodecException(); + } int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = null; + ByteBufferOutputStream baos; if (pool != null) { ByteBuffer bb = pool.getBuffer(); bufferSize = bb.capacity(); @@ -137,15 +138,17 @@ public class IPCUtil { } baos = new ByteBufferOutputStream(bufferSize); } - OutputStream os = baos; Compressor poolCompressor = null; - try { + try (OutputStream os = baos) { + OutputStream os2Compress = os; if (compressor != null) { - if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); + if (compressor instanceof Configurable) { + ((Configurable) compressor).setConf(this.conf); + } poolCompressor = CodecPool.getCompressor(compressor); - os = compressor.createOutputStream(os, poolCompressor); + os2Compress = compressor.createOutputStream(os, poolCompressor); } - Codec.Encoder encoder = codec.getEncoder(os); + Codec.Encoder encoder = codec.getEncoder(os2Compress); int count = 0; while (cellScanner.advance()) { encoder.write(cellScanner.current()); @@ -154,12 +157,15 @@ public class IPCUtil { encoder.flush(); // If no cells, don't mess around. Just return null (could be a bunch of existence checking // gets or something -- stuff that does not return a cell). - if (count == 0) return null; + if (count == 0) { + return null; + } } catch (BufferOverflowException e) { throw new DoNotRetryIOException(e); } finally { - os.close(); - if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); + if (poolCompressor != null) { + CodecPool.returnCompressor(poolCompressor); + } } if (LOG.isTraceEnabled()) { if (bufferSize < baos.size()) { @@ -171,10 +177,10 @@ public class IPCUtil { } /** - * @param codec - * @param cellBlock + * @param codec to use for cellblock + * @param cellBlock to encode * @return CellScanner to work against the content of cellBlock - * @throws IOException + * @throws IOException if encoding fails */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte[] cellBlock) throws IOException { @@ -191,12 +197,12 @@ public class IPCUtil { } /** - * @param codec + * @param codec to use for cellblock * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be - * position()'ed at the start of the cell block and limit()'ed at the end. + * position()'ed at the start of the cell block and limit()'ed at the end. * @return CellScanner to work against the content of cellBlock. - * All cells created out of the CellScanner will share the same ByteBuffer being passed. - * @throws IOException + * All cells created out of the CellScanner will share the same ByteBuffer being passed. + * @throws IOException if cell encoding fails */ public CellScanner createCellScannerReusingBuffers(final Codec codec, final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { @@ -212,11 +218,13 @@ public class IPCUtil { private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { // GZIPCodec fails w/ NPE if no configuration. - if (compressor instanceof Configurable) ((Configurable) compressor).setConf(this.conf); + if (compressor instanceof Configurable) { + ((Configurable) compressor).setConf(this.conf); + } Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); - ByteBufferOutputStream bbos = null; + ByteBufferOutputStream bbos; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: Reuse buffers. @@ -232,33 +240,13 @@ public class IPCUtil { } /** - * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its - * serialization. - * @return The passed in Message serialized with delimiter. Return null if m is null - * @throws IOException - */ - public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { - if (m == null) return null; - int serializedSize = m.getSerializedSize(); - int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); - byte [] buffer = new byte[serializedSize + vintSize]; - // Passing in a byte array saves COS creating a buffer which it does when using streams. - CodedOutputStream cos = CodedOutputStream.newInstance(buffer); - // This will write out the vint preamble and the message serialized. - cos.writeMessageNoTag(m); - cos.flush(); - cos.checkNoSpaceLeft(); - return ByteBuffer.wrap(buffer); - } - - /** * Write out header, param, and cell block if there is one. - * @param dos - * @param header - * @param param - * @param cellBlock + * @param dos Stream to write into + * @param header to write + * @param param to write + * @param cellBlock to write * @return Total number of bytes written. - * @throws IOException + * @throws IOException if write action fails */ public static int write(final OutputStream dos, final Message header, final Message param, final ByteBuffer cellBlock) @@ -267,7 +255,9 @@ public class IPCUtil { // swoop. This is dictated by how the server is currently written. Server needs to change // if we are to be able to write without the length prefixing. int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); - if (cellBlock != null) totalSize += cellBlock.remaining(); + if (cellBlock != null) { + totalSize += cellBlock.remaining(); + } return write(dos, header, param, cellBlock, totalSize); } @@ -278,36 +268,25 @@ public class IPCUtil { dos.write(Bytes.toBytes(totalSize)); // This allocates a buffer that is the size of the message internally. header.writeDelimitedTo(dos); - if (param != null) param.writeDelimitedTo(dos); - if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); + if (param != null) { + param.writeDelimitedTo(dos); + } + if (cellBlock != null) { + dos.write(cellBlock.array(), 0, cellBlock.remaining()); + } dos.flush(); return totalSize; } /** - * Read in chunks of 8K (HBASE-7239) - * @param in - * @param dest - * @param offset - * @param len - * @throws IOException - */ - public static void readChunked(final DataInput in, byte[] dest, int offset, int len) - throws IOException { - int maxRead = 8192; - - for (; offset < len; offset += maxRead) { - in.readFully(dest, offset, Math.min(len - offset, maxRead)); - } - } - - /** * @return Size on the wire when the two messages are written with writeDelimitedTo */ public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { int totalSize = 0; for (Message m: messages) { - if (m == null) continue; + if (m == null) { + continue; + } totalSize += m.getSerializedSize(); totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index 68798ed..6fae5cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -15,9 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + import java.io.IOException; import org.apache.commons.logging.Log; @@ -29,10 +32,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; - /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s * against the active master. An instance of this class may be obtained diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 4d3a453..55d6375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -15,9 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + import java.io.IOException; import org.apache.commons.logging.Log; @@ -32,10 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.util.Bytes; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; - /** * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s * against a given table region. An instance of this class may be obtained @@ -52,10 +51,10 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { private final TableName table; private final byte[] row; private byte[] lastRegion; - private int operationTimeout; + private final int operationTimeout; - private RpcRetryingCallerFactory rpcCallerFactory; - private RpcControllerFactory rpcControllerFactory; + private final RpcRetryingCallerFactory rpcCallerFactory; + private final RpcControllerFactory rpcControllerFactory; /** * Constructor @@ -105,7 +104,7 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { }; CoprocessorServiceResponse result = rpcCallerFactory. newCaller() .callWithRetries(callable, operationTimeout); - Message response = null; + Message response; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); ProtobufUtil.mergeFrom(builder, result.getValue().getValue()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java index 6ffb579..c23d36c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionServerCoprocessorRpcChannel.java @@ -11,28 +11,29 @@ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; /** * Provides clients with an RPC connection to call coprocessor endpoint * {@link com.google.protobuf.Service}s against a given region server. An instance of this class may - * be obtained by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName)}, - * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to - * call the endpoint methods. + * be obtained by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin# + * coprocessorService(ServerName)}, but should normally only be used in creating a new + * {@link com.google.protobuf.Service} stub to call the endpoint methods. * @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName) */ @InterfaceAudience.Private @@ -59,7 +60,7 @@ public class RegionServerCoprocessorRpcChannel extends SyncCoprocessorRpcChannel // TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller CoprocessorServiceResponse result = ProtobufUtil.execRegionServerService(controller, connection.getClient(serverName), call); - Message response = null; + Message response; if (result.getValue().hasValue()) { Message.Builder builder = responsePrototype.newBuilderForType(); ProtobufUtil.mergeFrom(builder, result.getValue().getValue()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index c3f4d02..f77f1ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.BlockingRpcChannel; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.security.User; import java.io.Closeable; import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.security.User; + /** * Interface for RpcClient implementations so ConnectionManager can handle it. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 822daca..07254e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hbase.ipc; import com.google.common.annotations.VisibleForTesting; + +import java.net.SocketAddress; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; -import java.net.SocketAddress; - /** * Factory to create a {@link org.apache.hadoop.hbase.ipc.RpcClient} */ @@ -74,7 +75,7 @@ public final class RpcClientFactory { return ReflectionUtils.instantiateWithCustomCtor( rpcClientClass, new Class[] { Configuration.class, String.class, SocketAddress.class, - MetricsConnection.class }, + MetricsConnection.class }, new Object[] { conf, clusterId, localAddr, metrics } ); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 83d4adf..7b2500c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,9 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.ipc; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcCallback; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; @@ -96,12 +100,6 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; -import com.google.protobuf.RpcCallback; - /** * Does RPC against a cluster. Manages connections per regionserver in the cluster. *

See HBaseServer @@ -160,25 +158,25 @@ public class RpcClientImpl extends AbstractRpcClient { * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ protected class Connection extends Thread { - private ConnectionHeader header; // connection header + private final ConnectionHeader header; // connection header protected ConnectionId remoteId; protected Socket socket = null; // connected socket protected DataInputStream in; protected DataOutputStream out; - private Object outLock = new Object(); - private InetSocketAddress server; // server ip:port + private final Object outLock = new Object(); + private final InetSocketAddress server; // server ip:port private String serverPrincipal; // server's krb5 principal name private AuthMethod authMethod; // authentication method private boolean useSasl; private Token token; private HBaseSaslRpcClient saslRpcClient; - private int reloginMaxBackoff; // max pause before relogin on sasl failure + private final int reloginMaxBackoff; // max pause before relogin on sasl failure private final Codec codec; private final CompressionCodec compressor; // currently active calls protected final ConcurrentSkipListMap calls = - new ConcurrentSkipListMap(); + new ConcurrentSkipListMap<>(); protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); protected final CallSender callSender; @@ -228,7 +226,7 @@ public class RpcClientImpl extends AbstractRpcClient { CallSender(String name, Configuration conf) { int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); - callsToWrite = new ArrayBlockingQueue(queueSize); + callsToWrite = new ArrayBlockingQueue<>(queueSize); setDaemon(true); setName(name + " - writer"); } @@ -438,21 +436,27 @@ public class RpcClientImpl extends AbstractRpcClient { socket.getOutputStream().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed - if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); + if (LOG.isTraceEnabled()){ + LOG.trace("ignored", ignored); + } } try { if (socket.getInputStream() != null) { socket.getInputStream().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed - if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); + if (LOG.isTraceEnabled()){ + LOG.trace("ignored", ignored); + } } try { if (socket.getChannel() != null) { socket.getChannel().close(); } } catch (IOException ignored) { // Can happen if the socket is already closed - if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored); + if (LOG.isTraceEnabled()){ + LOG.trace("ignored", ignored); + } } try { socket.close(); @@ -665,8 +669,8 @@ public class RpcClientImpl extends AbstractRpcClient { return null; } else { String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; + UserGroupInformation.getLoginUser().getUserName() + + " to " + serverPrincipal; LOG.warn(msg); throw (IOException) new IOException(msg).initCause(ex); } @@ -735,7 +739,9 @@ public class RpcClientImpl extends AbstractRpcClient { } } boolean continueSasl; - if (ticket == null) throw new FatalConnectionException("ticket/user is null"); + if (ticket == null){ + throw new FatalConnectionException("ticket/user is null"); + } try { continueSasl = ticket.doAs(new PrivilegedExceptionAction() { @Override @@ -868,11 +874,8 @@ public class RpcClientImpl extends AbstractRpcClient { } protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException { - TraceScope ts = Trace.continueSpan(span); - try { + try (TraceScope ignored = Trace.continueSpan(span)) { writeRequest(call, priority, span); - } finally { - ts.close(); } } @@ -903,7 +906,7 @@ public class RpcClientImpl extends AbstractRpcClient { if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { builder.setPriority(priority); } - RequestHeader header = builder.build(); + RequestHeader requestHeader = builder.build(); setupIOstreams(); @@ -913,13 +916,15 @@ public class RpcClientImpl extends AbstractRpcClient { checkIsOpen(); IOException writeException = null; synchronized (this.outLock) { - if (Thread.interrupted()) throw new InterruptedIOException(); + if (Thread.interrupted()){ + throw new InterruptedIOException(); + } calls.put(call.id, call); // We put first as we don't want the connection to become idle. checkIsOpen(); // Now we're checking that it didn't became idle in between. try { - call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param, + call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, requestHeader, call.param, cellBlock)); } catch (IOException e) { // We set the value inside the synchronized block, this way the next in line @@ -941,7 +946,9 @@ public class RpcClientImpl extends AbstractRpcClient { doNotify(); // Now that we notified, we can rethrow the exception if any. Otherwise we're good. - if (writeException != null) throw writeException; + if (writeException != null){ + throw writeException; + } } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", @@ -956,7 +963,9 @@ public class RpcClientImpl extends AbstractRpcClient { * Because only one receiver, so no synchronization on in. */ protected void readResponse() { - if (shouldCloseConnection.get()) return; + if (shouldCloseConnection.get()){ + return; + } Call call = null; boolean expectedCall = false; try { @@ -1015,12 +1024,16 @@ public class RpcClientImpl extends AbstractRpcClient { EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); } } catch (IOException e) { - if (expectedCall) call.setException(e); + if (expectedCall){ + call.setException(e); + } if (e instanceof SocketTimeoutException) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. - if (LOG.isTraceEnabled()) LOG.trace("ignored", e); + if (LOG.isTraceEnabled()){ + LOG.trace("ignored", e); + } } else { // Treat this as a fatal condition and close this connection markClosed(e); @@ -1054,7 +1067,9 @@ public class RpcClientImpl extends AbstractRpcClient { } protected synchronized boolean markClosed(IOException e) { - if (e == null) throw new NullPointerException(); + if (e == null){ + throw new NullPointerException(); + } boolean ret = shouldCloseConnection.compareAndSet(false, true); if (ret) { @@ -1124,7 +1139,7 @@ public class RpcClientImpl extends AbstractRpcClient { super(conf, clusterId, localAddr, metrics); this.socketFactory = factory; - this.connections = new PoolMap(getPoolType(conf), getPoolSize(conf)); + this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); this.failedServers = new FailedServers(conf); } @@ -1156,8 +1171,12 @@ public class RpcClientImpl extends AbstractRpcClient { * using this client. */ @Override public void close() { - if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client"); - if (!running.compareAndSet(true, false)) return; + if (LOG.isDebugEnabled()){ + LOG.debug("Stopping rpc client"); + } + if (!running.compareAndSet(true, false)){ + return; + } Set connsToClose = null; // wake up all connections @@ -1172,7 +1191,7 @@ public class RpcClientImpl extends AbstractRpcClient { // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851 if (!conn.isAlive()) { if (connsToClose == null) { - connsToClose = new HashSet(); + connsToClose = new HashSet<>(); } connsToClose.add(conn); } @@ -1207,8 +1226,8 @@ public class RpcClientImpl extends AbstractRpcClient { * {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a * new Connection each time. * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException - * @throws IOException + * @throws InterruptedException if the call is interupted + * @throws IOException if something fails on the connection */ @Override protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, @@ -1228,17 +1247,17 @@ public class RpcClientImpl extends AbstractRpcClient { final CallFuture cts; if (connection.callSender != null) { cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan()); - pcrc.notifyOnCancel(new RpcCallback() { - @Override - public void run(Object parameter) { - connection.callSender.remove(cts); - } - }); - if (pcrc.isCanceled()) { - // To finish if the call was cancelled before we set the notification (race condition) - call.callComplete(); - return new Pair(call.response, call.cells); + pcrc.notifyOnCancel(new RpcCallback() { + @Override + public void run(Object parameter) { + connection.callSender.remove(cts); } + }); + if (pcrc.isCanceled()) { + // To finish if the call was cancelled before we set the notification (race condition) + call.callComplete(); + return new Pair<>(call.response, call.cells); + } } else { cts = null; connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan()); @@ -1246,7 +1265,9 @@ public class RpcClientImpl extends AbstractRpcClient { while (!call.done) { if (call.checkAndSetTimeout()) { - if (cts != null) connection.callSender.remove(cts); + if (cts != null){ + connection.callSender.remove(cts); + } break; } if (connection.shouldCloseConnection.get()) { @@ -1255,12 +1276,16 @@ public class RpcClientImpl extends AbstractRpcClient { } try { synchronized (call) { - if (call.done) break; + if (call.done){ + break; + } call.wait(Math.min(call.remainingTime(), 1000) + 1); } } catch (InterruptedException e) { call.setException(new InterruptedIOException()); - if (cts != null) connection.callSender.remove(cts); + if (cts != null) { + connection.callSender.remove(cts); + } throw e; } } @@ -1274,7 +1299,7 @@ public class RpcClientImpl extends AbstractRpcClient { throw wrapException(addr, call.error); } - return new Pair(call.response, call.cells); + return new Pair<>(call.response, call.cells); } @@ -1303,12 +1328,14 @@ public class RpcClientImpl extends AbstractRpcClient { } /** - * Get a connection from the pool, or create a new one and add it to the + * Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given host/port are reused. */ protected Connection getConnection(User ticket, Call call, InetSocketAddress addr) throws IOException { - if (!running.get()) throw new StoppedRpcClientException(); + if (!running.get()){ + throw new StoppedRpcClientException(); + } Connection connection; ConnectionId remoteId = new ConnectionId(ticket, call.md.getService().getName(), addr); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java index aa407f7..b899eb8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.util.StringUtils; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - /** * Used for server-side protobuf RPC service invocations. This handler allows * invocation exceptions to easily be passed through to the RPC server from coprocessor @@ -55,7 +55,7 @@ public class ServerRpcController implements RpcController { /** * The exception thrown within * {@link com.google.protobuf.Service#callMethod( - * Descriptors.MethodDescriptor, RpcController, Message, RpcCallback)}, + * Descriptors.MethodDescriptor, RpcController, Message, RpcCallback)} * if any. */ // TODO: it would be good widen this to just Throwable, but IOException is what we allow now diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java index de502cb..cf08ea9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hbase.ipc; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - @InterfaceAudience.Private public class TimeLimitedRpcController implements RpcController { @@ -35,10 +35,10 @@ public class TimeLimitedRpcController implements RpcController { protected volatile Integer callTimeout; protected volatile boolean cancelled = false; protected final AtomicReference> cancellationCb = - new AtomicReference>(null); + new AtomicReference<>(null); protected final AtomicReference> failureCb = - new AtomicReference>(null); + new AtomicReference<>(null); private IOException exception; -- 2.6.4 (Apple Git-63)