From 6183da51ae0d64cdbc3d0c7d097698b1f560d6be Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Wed, 11 Feb 2015 10:37:12 +0100 Subject: [PATCH] HBASE-13011 --- .../java/org/apache/hadoop/hbase/ipc/AsyncCall.java | 20 ++++++++++++++++++++ .../org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 6 ++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index c35238c..3a51fc7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -50,6 +50,8 @@ public class AsyncCall extends DefaultPromise { final long startTime; final long rpcTimeout; + private boolean writeLock = false; + /** * Constructor * @@ -132,4 +134,22 @@ public class AsyncCall extends DefaultPromise { public long getRpcTimeout() { return rpcTimeout; } + + /** + * Check if call is write locked + * + * @return true if call is write locked + */ + public boolean isWriteLocked() { + return writeLock; + } + + /** + * Set the write lock on the call so it can only be written once + * + * @param writeLock true if write lock should be on. + */ + public void setWriteLock(boolean writeLock) { + this.writeLock = writeLock; + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 054c9b5..fb05dc5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -400,10 +400,12 @@ public class AsyncRpcChannel { */ private void writeRequest(final AsyncCall call) { try { - if (shouldCloseConnection) { + if (shouldCloseConnection || call.isWriteLocked()) { return; } + call.setWriteLock(true); + final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader .newBuilder(); requestHeaderBuilder.setCallId(call.id) @@ -550,7 +552,7 @@ public class AsyncRpcChannel { * @param e exception on close */ public void close(final Throwable e) { - client.removeConnection(ConnectionId.hashCode(ticket,serviceName,address)); + client.removeConnection(ConnectionId.hashCode(ticket, serviceName, address)); // Move closing from the requesting thread to the channel thread channel.eventLoop().execute(new Runnable() { -- 1.9.3 (Apple Git-50)