From 9e2979249292533105999c5146e8caa6868e88c8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 12 Feb 2015 21:48:17 +0800 Subject: [PATCH] HBASE-13011 TestLoadIncrementalHFiles is flakey when using AsyncRpcClient as client implementation --- .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 9 +- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 310 +++++++++------------ .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 27 +- .../hbase/ipc/AsyncServerResponseHandler.java | 16 +- 4 files changed, 165 insertions(+), 197 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index c35238c..68a494d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -72,7 +72,7 @@ public class AsyncCall extends DefaultPromise { this.responseDefaultType = responseDefaultType; this.startTime = EnvironmentEdgeManager.currentTime(); - this.rpcTimeout = controller.getCallTimeout(); + this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0; } /** @@ -84,9 +84,10 @@ public class AsyncCall extends DefaultPromise { return this.startTime; } - @Override public String toString() { - return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" + - (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}"; + @Override + public String toString() { + return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" + + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}"; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 054c9b5..eba71c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; @@ -31,6 +28,23 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import javax.security.sasl.SaslException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; @@ -56,18 +70,9 @@ import org.apache.hadoop.security.token.TokenSelector; import org.apache.htrace.Span; import org.apache.htrace.Trace; -import javax.security.sasl.SaslException; -import java.io.IOException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; /** * Netty RPC channel @@ -97,8 +102,6 @@ public class AsyncRpcChannel { final String serviceName; final InetSocketAddress address; - ConcurrentSkipListMap calls = new ConcurrentSkipListMap<>(); - private int ioFailureCounter = 0; private int connectFailureCounter = 0; @@ -107,16 +110,18 @@ public class AsyncRpcChannel { private int reloginMaxBackoff; private Token token; private String serverPrincipal; - - volatile boolean shouldCloseConnection = false; - private IOException closeException; + + private final Map pendingCalls = new HashMap(); + + private boolean connected = false; + private boolean closed = false; private Timeout cleanupTimer; private final TimerTask timeoutTask = new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { - cleanupTimer = null; - cleanupCalls(false); + @Override + public void run(Timeout timeout) throws Exception { + cleanupCalls(); } }; @@ -213,15 +218,20 @@ public class AsyncRpcChannel { ch.pipeline() .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast(new AsyncServerResponseHandler(this)); - try { writeChannelHeader(ch).addListener(new GenericFutureListener() { - @Override public void operationComplete(ChannelFuture future) throws Exception { + @Override + public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { close(future.cause()); return; } - for (AsyncCall call : calls.values()) { + List callsToWrite; + synchronized (pendingCalls) { + connected = true; + callsToWrite = new ArrayList(pendingCalls.values()); + } + for (AsyncCall call : callsToWrite) { writeRequest(call); } } @@ -240,17 +250,18 @@ public class AsyncRpcChannel { */ private SaslClientHandler getSaslHandler(final Bootstrap bootstrap) throws IOException { return new SaslClientHandler(authMethod, token, serverPrincipal, client.fallbackAllowed, - client.conf.get("hbase.rpc.protection", - SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), - new SaslClientHandler.SaslExceptionHandler() { - @Override public void handle(int retryCount, Random random, Throwable cause) { + client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name() + .toLowerCase()), new SaslClientHandler.SaslExceptionHandler() { + @Override + public void handle(int retryCount, Random random, Throwable cause) { try { // Handle Sasl failure. Try to potentially get new credentials handleSaslConnectionFailure(retryCount, cause, ticket.getUGI()); // Try to reconnect AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { - @Override public void run(Timeout timeout) throws Exception { + @Override + public void run(Timeout timeout) throws Exception { connect(bootstrap); } }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS); @@ -259,10 +270,11 @@ public class AsyncRpcChannel { } } }, new SaslClientHandler.SaslSuccessfulConnectHandler() { - @Override public void onSuccess(Channel channel) { - startHBaseConnection(channel); - } - }); + @Override + public void onSuccess(Channel channel) { + startHBaseConnection(channel); + } + }); } /** @@ -295,66 +307,50 @@ public class AsyncRpcChannel { public Promise callMethod(final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller, final Message request, final Message responsePrototype) { - if (shouldCloseConnection) { - Promise promise = channel.eventLoop().newPromise(); - promise.setFailure(new ConnectException()); - return promise; - } - - final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), - method, request, controller, responsePrototype); - + final AsyncCall call = + new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, + controller, responsePrototype); controller.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { - calls.remove(call.id); + // TODO: do not need to call AsyncCall.setFailed? + synchronized (pendingCalls) { + pendingCalls.remove(call.id); + } } }); + // TODO: this should be handled by PayloadCarryingRpcController. if (controller.isCanceled()) { // To finish if the call was cancelled before we set the notification (race condition) call.cancel(true); return call; } - - calls.put(call.id, call); - - // check again, see https://issues.apache.org/jira/browse/HBASE-12951 - if (shouldCloseConnection) { - Promise promise = channel.eventLoop().newPromise(); - promise.setFailure(new ConnectException()); - return promise; - } - - // Add timeout for cleanup if none is present - if (cleanupTimer == null) { - cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(), - TimeUnit.MILLISECONDS); - } - - if(channel.isActive()) { - writeRequest(call); + + synchronized (pendingCalls) { + if (closed) { + Promise promise = channel.eventLoop().newPromise(); + promise.setFailure(new ConnectException()); + return promise; + } + pendingCalls.put(call.id, call); + // Add timeout for cleanup if none is present + if (cleanupTimer == null && call.getRpcTimeout() > 0) { + cleanupTimer = + AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(), + TimeUnit.MILLISECONDS); + } + if (!connected) { + return call; + } } - + writeRequest(call); return call; } - - /** - * Calls method and returns a promise - * @param method to call - * @param controller to run call with - * @param request to send - * @param responsePrototype for response message - * @return Promise to listen to result - * @throws java.net.ConnectException on connection failures - */ - public Promise callMethodWithPromise( - final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller, - final Message request, final Message responsePrototype) throws ConnectException { - if (shouldCloseConnection || !channel.isOpen()) { - throw new ConnectException(); + + AsyncCall removePendingCall(int id) { + synchronized (pendingCalls) { + return pendingCalls.remove(id); } - - return this.callMethod(method, controller, request, responsePrototype); } /** @@ -400,10 +396,6 @@ public class AsyncRpcChannel { */ private void writeRequest(final AsyncCall call) { try { - if (shouldCloseConnection) { - return; - } - final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader .newBuilder(); requestHeaderBuilder.setCallId(call.id) @@ -439,26 +431,13 @@ public class AsyncRpcChannel { IPCUtil.write(out, rh, call.param, cellBlock); } - channel.writeAndFlush(b).addListener(new CallWriteListener(this,call)); + channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); } catch (IOException e) { - if (!shouldCloseConnection) { - close(e); - } + close(e); } } /** - * Fail a call - * - * @param call to fail - * @param cause of fail - */ - void failCall(AsyncCall call, IOException cause) { - calls.remove(call.id); - call.setFailed(cause); - } - - /** * Set up server authorization * * @throws java.io.IOException if auth setup failed @@ -546,22 +525,29 @@ public class AsyncRpcChannel { /** * Close connection - * * @param e exception on close */ public void close(final Throwable e) { - client.removeConnection(ConnectionId.hashCode(ticket,serviceName,address)); + client.removeConnection(this); // Move closing from the requesting thread to the channel thread channel.eventLoop().execute(new Runnable() { @Override public void run() { - if (shouldCloseConnection) { - return; + List toCleanup; + synchronized (pendingCalls) { + if (closed) { + return; + } + closed = true; + toCleanup = new ArrayList(pendingCalls.values()); + pendingCalls.clear(); + if (cleanupTimer != null) { + cleanupTimer.cancel(); + cleanupTimer = null; + } } - - shouldCloseConnection = true; - + IOException closeException = null; if (e != null) { if (e instanceof IOException) { closeException = (IOException) e; @@ -569,16 +555,15 @@ public class AsyncRpcChannel { closeException = new IOException(e); } } - // log the info if (LOG.isDebugEnabled() && closeException != null) { - LOG.debug(name + ": closing ipc connection to " + address + ": " + - closeException.getMessage()); + LOG.debug(name + ": closing ipc connection to " + address, closeException); + } + for (AsyncCall call : toCleanup) { + call.setFailed(closeException != null ? closeException : new ConnectionClosingException( + "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); } - - cleanupCalls(true); channel.disconnect().addListener(ChannelFutureListener.CLOSE); - if (LOG.isDebugEnabled()) { LOG.debug(name + ": closed"); } @@ -591,64 +576,37 @@ public class AsyncRpcChannel { * * @param cleanAll true if all calls should be cleaned, false for only the timed out calls */ - public void cleanupCalls(boolean cleanAll) { - // Cancel outstanding timers - if (cleanupTimer != null) { - cleanupTimer.cancel(); - cleanupTimer = null; - } - - if (cleanAll) { - for (AsyncCall call : calls.values()) { - synchronized (call) { - // Calls can be done on another thread so check before failing them - if(!call.isDone()) { - if (closeException == null) { - failCall(call, new ConnectionClosingException("Call id=" + call.id + - " on server " + address + " aborted: connection is closing")); - } else { - failCall(call, closeException); - } - } - } - } - } else { - for (AsyncCall call : calls.values()) { - long waitTime = EnvironmentEdgeManager.currentTime() - call.getStartTime(); + private void cleanupCalls() { + List toCleanup = new ArrayList(); + long currentTime = EnvironmentEdgeManager.currentTime(); + long nextCleanupTaskDelay = -1L; + synchronized (pendingCalls) { + for (Iterator iter = pendingCalls.values().iterator(); iter.hasNext();) { + AsyncCall call = iter.next(); long timeout = call.getRpcTimeout(); - if (timeout > 0 && waitTime >= timeout) { - synchronized (call) { - // Calls can be done on another thread so check before failing them - if (!call.isDone()) { - closeException = new CallTimeoutException("Call id=" + call.id + - ", waitTime=" + waitTime + ", rpcTimeout=" + timeout); - failCall(call, closeException); + if (timeout > 0) { + if (currentTime - call.getStartTime() >= timeout) { + iter.remove(); + toCleanup.add(call); + } else { + if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) { + nextCleanupTaskDelay = timeout; } } - } else { - // We expect the call to be ordered by timeout. It may not be the case, but stopping - // at the first valid call allows to be sure that we still have something to do without - // spending too much time by reading the full list. - break; } } - - if (!calls.isEmpty()) { - AsyncCall firstCall = calls.firstEntry().getValue(); - - final long newTimeout; - long maxWaitTime = EnvironmentEdgeManager.currentTime() - firstCall.getStartTime(); - if (maxWaitTime < firstCall.getRpcTimeout()) { - newTimeout = firstCall.getRpcTimeout() - maxWaitTime; - } else { - newTimeout = 0; - } - - closeException = null; - cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, - newTimeout, TimeUnit.MILLISECONDS); + if (nextCleanupTaskDelay > 0) { + cleanupTimer = + AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay, + TimeUnit.MILLISECONDS); + } else { + cleanupTimer = null; } } + for (AsyncCall call : toCleanup) { + call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime=" + + (currentTime - call.getRpcTimeout()) + ", rpcTimeout=" + call.getRpcTimeout())); + } } /** @@ -745,6 +703,10 @@ public class AsyncRpcChannel { }); } + public int getConnectionHashCode() { + return ConnectionId.hashCode(ticket, serviceName, address); + } + @Override public String toString() { return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; @@ -755,21 +717,21 @@ public class AsyncRpcChannel { */ private static final class CallWriteListener implements ChannelFutureListener { private final AsyncRpcChannel rpcChannel; - private final AsyncCall call; + private final int id; - public CallWriteListener(AsyncRpcChannel asyncRpcChannel, AsyncCall call) { + public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) { this.rpcChannel = asyncRpcChannel; - this.call = call; + this.id = id; } - @Override public void operationComplete(ChannelFuture future) throws Exception { + @Override + public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - if(!this.call.isDone()) { - if (future.cause() instanceof IOException) { - rpcChannel.failCall(call, (IOException) future.cause()); - } else { - rpcChannel.failCall(call, new IOException(future.cause())); - } + AsyncCall call = rpcChannel.removePendingCall(id); + if (future.cause() instanceof IOException) { + call.setFailed((IOException) future.cause()); + } else { + call.setFailed(new IOException(future.cause())); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 30b622a..74173f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -23,6 +23,7 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; @@ -38,6 +39,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; @@ -55,6 +57,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -169,16 +172,16 @@ public class AsyncRpcClient extends AbstractRpcClient { * @throws InterruptedException if call is interrupted * @throws java.io.IOException if a connection failure is encountered */ - @Override protected Pair call(PayloadCarryingRpcController pcrc, + @Override + protected Pair call(PayloadCarryingRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, InetSocketAddress addr) throws IOException, InterruptedException { - final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - Promise promise = connection.callMethodWithPromise(md, pcrc, param, returnType); - + Promise promise = connection.callMethod(md, pcrc, param, returnType); + long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; try { - Message response = promise.get(); + Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); return new Pair<>(response, pcrc.cellScanner()); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { @@ -186,6 +189,8 @@ public class AsyncRpcClient extends AbstractRpcClient { } else { throw new IOException(e.getCause()); } + } catch (TimeoutException e) { + throw new CallTimeoutException(promise.toString()); } } @@ -337,12 +342,16 @@ public class AsyncRpcClient extends AbstractRpcClient { /** * Remove connection from pool - * - * @param connectionHashCode of connection */ - public void removeConnection(int connectionHashCode) { + public void removeConnection(AsyncRpcChannel connection) { + int connectionHashCode = connection.getConnectionHashCode(); synchronized (connections) { - this.connections.remove(connectionHashCode); + // we use address as cache key, so we should check here to prevent remove the + // wrong connection + AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode); + if (connectionInPool == connection) { + this.connections.remove(connectionHashCode); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index d71bf5e..a900140 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.Message; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; @@ -29,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.ipc.RemoteException; -import java.io.IOException; +import com.google.protobuf.Message; /** * Handles Hbase responses @@ -52,16 +54,12 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf inBuffer = (ByteBuf) msg; ByteBufInputStream in = new ByteBufInputStream(inBuffer); - - if (channel.shouldCloseConnection) { - return; - } int totalSize = inBuffer.readableBytes(); try { // Read the header RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in); int id = responseHeader.getCallId(); - AsyncCall call = channel.calls.get(id); + AsyncCall call = channel.removePendingCall(id); if (call == null) { // So we got a response for which we have no corresponding 'call' here on the client-side. // We probably timed out waiting, cleaned up all references, and now the server decides @@ -85,7 +83,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { equals(FatalConnectionException.class.getName())) { channel.close(re); } else { - channel.failCall(call, re); + call.setFailed(re); } } else { Message value = null; @@ -104,13 +102,11 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { } call.setSuccess(value, cellBlockScanner); } - channel.calls.remove(id); } catch (IOException e) { // Treat this as a fatal condition and close this connection channel.close(e); } finally { inBuffer.release(); - channel.cleanupCalls(false); } } -- 1.9.1