From e1d2cd2b478e605de82b6549709043a714bf8334 Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Thu, 16 Jun 2016 10:13:09 +0200 Subject: [PATCH] HBASE-15978 Netty API leaked into public API --- .../org/apache/hadoop/hbase/client/Future.java | 34 ------------- .../hbase/client/ResponseFutureListener.java | 30 ------------ .../org/apache/hadoop/hbase/ipc/AsyncCall.java | 8 +-- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 6 +-- .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java | 12 +++-- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 57 +++++++++------------- .../java/org/apache/hadoop/hbase/ipc/Promise.java | 38 --------------- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 16 +++--- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 27 +++++----- 9 files changed, 59 insertions(+), 169 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java deleted file mode 100644 index 99a8baa..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Promise for responses - * @param Value type - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving - -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_SAME_SIMPLE_NAME_AS_INTERFACE", - justification="Agree that this can be confusing but folks will pull in this and think twice " - + "about pulling in netty; incidence of confusion should be rare in this case.") -public interface Future extends io.netty.util.concurrent.Future { -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java deleted file mode 100644 index f23dc8f..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import io.netty.util.concurrent.GenericFutureListener; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Specific interface for the Response future listener - * @param Value type. - */ -@InterfaceAudience.Private -public interface ResponseFutureListener - extends GenericFutureListener> { -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java index 89e6ca4..8456a8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; @@ -39,7 +40,7 @@ import org.apache.hadoop.ipc.RemoteException; * @param Message returned in communication to be converted */ @InterfaceAudience.Private -public class AsyncCall extends Promise { +public class AsyncCall extends CompletableFuture { private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName()); final int id; @@ -81,7 +82,6 @@ public class AsyncCall extends Promise { md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority, MetricsConnection metrics) { - super(channel.getEventExecutor()); this.channel = channel; this.id = connectId; @@ -138,7 +138,7 @@ public class AsyncCall extends Promise { } try { - this.setSuccess( + this.complete( this.messageConverter.convert(value, cellBlockScanner) ); } catch (IOException e) { @@ -163,7 +163,7 @@ public class AsyncCall extends Promise { exception = this.exceptionConverter.convert(exception); } - this.setFailure(exception); + this.completeExceptionally(exception); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 8cc730f..5173246 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -23,10 +23,10 @@ import com.google.protobuf.Message; import io.netty.util.concurrent.EventExecutor; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; /** * Interface for Async Rpc Channels @@ -44,9 +44,9 @@ public interface AsyncRpcChannel { * @param exceptionConverter for converting exceptions * @param rpcTimeout timeout for request * @param priority for request - * @return Promise for the response Message + * @return CompletableFuture for the response Message */ - Future callMethod( + CompletableFuture callMethod( final Descriptors.MethodDescriptor method, final Message request, final CellScanner cellScanner, R responsePrototype, MessageConverter messageConverter, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java index 6b7dc5b..bc0d26f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; @@ -30,6 +31,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.GenericFutureListener; + import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -38,10 +40,11 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.Locale; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.security.sasl.SaslException; @@ -51,7 +54,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; @@ -320,10 +322,10 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { * @param responsePrototype to construct response with * @param rpcTimeout timeout for request * @param priority for request - * @return Promise for the response Message + * @return CompletableFuture for the response Message */ @Override - public Future callMethod( + public CompletableFuture callMethod( final Descriptors.MethodDescriptor method, final Message request,final CellScanner cellScanner, R responsePrototype, MessageConverter messageConverter, IOExceptionConverter @@ -334,7 +336,7 @@ public class AsyncRpcChannelImpl implements AsyncRpcChannel { synchronized (pendingCalls) { if (closed) { - call.setFailure(new ConnectException()); + call.completeExceptionally(new ConnectException()); return call; } pendingCalls.put(call.id, call); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 723a234..fd300f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -23,6 +23,7 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -42,10 +43,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,9 +59,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.ResponseFutureListener; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.Pair; @@ -240,7 +241,8 @@ public class AsyncRpcClient extends AbstractRpcClient { } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - final Future promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, + final CompletableFuture future = connection.callMethod( + md, param, pcrc.cellScanner(), returnType, getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority()); @@ -248,13 +250,13 @@ public class AsyncRpcClient extends AbstractRpcClient { @Override public void run(Object parameter) { // Will automatically fail the promise with CancellationException - promise.cancel(true); + future.cancel(true); } }); long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; try { - Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); + Message response = timeout > 0 ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); return new Pair<>(response, pcrc.cellScanner()); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { @@ -263,7 +265,7 @@ public class AsyncRpcClient extends AbstractRpcClient { throw wrapException(addr, (Exception) e.getCause()); } } catch (TimeoutException e) { - CallTimeoutException cte = new CallTimeoutException(promise.toString()); + CallTimeoutException cte = new CallTimeoutException(future.toString()); throw wrapException(addr, cte); } } @@ -290,37 +292,24 @@ public class AsyncRpcClient extends AbstractRpcClient { try { connection = createRpcChannel(md.getService().getName(), addr, ticket); - ResponseFutureListener listener = - new ResponseFutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } + CompletableFuture future = connection.callMethod(md, param, pcrc.cellScanner(), + returnType, getMessageConverterWithRpcController(pcrc), null, + pcrc.getCallTimeout(), pcrc.getPriority()); + + future.whenCompleteAsync(new BiConsumer() { + @Override + public void accept(Message message, Throwable cause) { + if(cause != null) { + if (cause instanceof IOException) { + pcrc.setFailed((IOException) cause); } else { - try { - done.run(future.get()); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } catch (InterruptedException e) { - pcrc.setFailed(new IOException(e)); - } + pcrc.setFailed(new IOException(cause)); } + }else { + done.run(message); } - }; - connection.callMethod(md, param, pcrc.cellScanner(), returnType, - getMessageConverterWithRpcController(pcrc), null, - pcrc.getCallTimeout(), pcrc.getPriority()) - .addListener(listener); + } + }); } catch (StoppedRpcClientException|FailedServerException e) { pcrc.setFailed(e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java deleted file mode 100644 index 0d05db8..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import io.netty.util.concurrent.DefaultPromise; -import io.netty.util.concurrent.EventExecutor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; - -/** - * Abstract response promise - * @param Type of result contained in Promise - */ -@InterfaceAudience.Private -public class Promise extends DefaultPromise implements Future { - /** - * Constructor - * @param eventLoop to handle events on - */ - public Promise(EventExecutor eventLoop) { - super(eventLoop); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index dc05af1..e31af78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -52,6 +52,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -66,7 +67,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; @@ -1469,7 +1469,7 @@ public class RpcClientImpl extends AbstractRpcClient { @Override @SuppressWarnings("unchecked") - public Future callMethod(final MethodDescriptor method, + public CompletableFuture callMethod(final MethodDescriptor method, final Message request, CellScanner cellScanner, final R responsePrototype, final MessageConverter messageConverter, final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) { @@ -1477,7 +1477,7 @@ public class RpcClientImpl extends AbstractRpcClient { pcrc.setPriority(priority); pcrc.setCallTimeout((int) rpcTimeout); - final Promise promise = new Promise<>(executor); + final CompletableFuture future = new CompletableFuture<>(); executor.execute(new Runnable() { @Override @@ -1494,21 +1494,21 @@ public class RpcClientImpl extends AbstractRpcClient { LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms"); } - promise.setSuccess( - messageConverter.convert((R) call.response, call.cells) + future.complete( + messageConverter.convert((R) call.response, call.cells) ); } catch (InterruptedException e) { - promise.cancel(true); + future.cancel(true); } catch (IOException e) { if(exceptionConverter != null) { e = exceptionConverter.convert(e); } - promise.setFailure(e); + future.completeExceptionally(e); } } }); - return promise; + return future; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 45cec78..e114f43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -17,15 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.internal.verification.VerificationModeFactory.times; - import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.protobuf.BlockingRpcChannel; @@ -35,6 +26,7 @@ import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.ConnectException; import java.net.InetAddress; @@ -42,9 +34,11 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,7 +51,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.Future; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; @@ -72,9 +65,17 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; -import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; + /** * Some basic ipc tests. */ @@ -394,7 +395,7 @@ public abstract class AbstractTestIPC { final EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build(); final EchoResponseProto echoResponse = stub.echo(null, echoRequest); - Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); + assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); } finally { rpcServer.stop(); } @@ -545,7 +546,7 @@ public abstract class AbstractTestIPC { AsyncRpcChannel channel = client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent()); - final Future f = channel + final CompletableFuture f = channel .callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER, null, 1000, HConstants.NORMAL_QOS); -- 2.5.0