diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 5cfd89f..7a3ba7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -39,6 +39,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.HBaseClientRPC; +import org.apache.hadoop.hbase.ipc.ProtocolMap; import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -541,10 +543,8 @@ public class HConnectionManager { private final Configuration conf; // Known region ServerName.toString() -> RegionClient/Admin - private final ConcurrentHashMap> servers = - new ConcurrentHashMap>(); - private final ConcurrentHashMap connectionLock = - new ConcurrentHashMap(); + private final ConcurrentMap servers = + new ConcurrentHashMap(); /** * Map of table to table {@link HRegionLocation}s. The table key is made @@ -709,7 +709,7 @@ public class HConnectionManager { InetSocketAddress isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - MasterProtocol tryMaster = (MasterProtocol) HBaseClientRPC.getProxy( + MasterProtocol tryMaster = HBaseClientRPC.getProxy( masterProtocolState.protocolClass, masterProtocolState.version, isa, this.conf, this.rpcTimeout); @@ -1330,15 +1330,13 @@ public class HConnectionManager { @Override public ClientProtocol getClient( final String hostname, final int port) throws IOException { - return (ClientProtocol)getProtocol(hostname, port, - clientClass, ClientProtocol.VERSION); + return getProtocol(hostname, port, clientClass, ClientProtocol.VERSION); } @Override public AdminProtocol getAdmin(final String hostname, final int port, final boolean master) throws IOException { - return (AdminProtocol)getProtocol(hostname, port, - adminClass, AdminProtocol.VERSION); + return getProtocol(hostname, port, adminClass, AdminProtocol.VERSION); } /** @@ -1351,48 +1349,37 @@ public class HConnectionManager { * @return Proxy. * @throws IOException */ - VersionedProtocol getProtocol(final String hostname, - final int port, final Class protocolClass, + T getProtocol(final String hostname, + final int port, final Class protocolClass, final long version) throws IOException { String rsName = Addressing.createHostAndPortStr(hostname, port); // See if we already have a connection (common case) - Map protocols = this.servers.get(rsName); + ProtocolMap protocols = this.servers.get(rsName); if (protocols == null) { - protocols = new HashMap(); - Map existingProtocols = - this.servers.putIfAbsent(rsName, protocols); + protocols = new ProtocolMap(); + ProtocolMap existingProtocols = this.servers.putIfAbsent(rsName, protocols); if (existingProtocols != null) { protocols = existingProtocols; } } - String protocol = protocolClass.getName(); - VersionedProtocol server = protocols.get(protocol); - if (server == null) { - // create a unique lock for this RS + protocol (if necessary) - String lockKey = protocol + "@" + rsName; - this.connectionLock.putIfAbsent(lockKey, lockKey); - // get the RS lock - synchronized (this.connectionLock.get(lockKey)) { - // do one more lookup in case we were stalled above - server = protocols.get(protocol); - if (server == null) { - try { - // Only create isa when we need to. - InetSocketAddress address = new InetSocketAddress(hostname, port); - // definitely a cache miss. establish an RPC for this RS - server = HBaseClientRPC.waitForProxy( - protocolClass, version, address, this.conf, - this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); - protocols.put(protocol, server); - } catch (RemoteException e) { - LOG.warn("RemoteException connecting to RS", e); - // Throw what the RemoteException was carrying. - throw e.unwrapRemoteException(); - } + + return protocols.get(protocolClass, new ProtocolMap.ProxyProvider() { + @Override + public T get() throws IOException { + try { + // Only create isa when we need to. + InetSocketAddress address = new InetSocketAddress(hostname, port); + // definitely a cache miss. establish an RPC for this RS + return HBaseClientRPC.waitForProxy( + protocolClass, version, address, conf, + maxRPCAttempts, rpcTimeout, rpcTimeout); + } catch (RemoteException e) { + LOG.warn("RemoteException connecting to RS", e); + // Throw what the RemoteException was carrying. + throw e.unwrapRemoteException(); } } - } - return server; + }); } @Override @@ -2250,7 +2237,7 @@ public class HConnectionManager { delayedClosing.stop("Closing connection"); if (stopProxy) { closeMaster(); - for (Map i : servers.values()) { + for (ProtocolMap i : servers.values()) { for (VersionedProtocol server: i.values()) { HBaseClientRPC.stopProxy(server); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java index 1b4f20b..ff57aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java @@ -120,8 +120,7 @@ public class HBaseClientRPC { * @return proxy * @throws java.io.IOException e */ - @SuppressWarnings("unchecked") - public static VersionedProtocol waitForProxy(Class protocol, + public static T waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, @@ -212,7 +211,7 @@ public class HBaseClientRPC { * @return proxy * @throws java.io.IOException e */ - public static VersionedProtocol getProxy(Class protocol, + public static T getProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, @@ -236,13 +235,13 @@ public class HBaseClientRPC { * @return proxy * @throws java.io.IOException e */ - public static VersionedProtocol getProxy( - Class protocol, + public static T getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { RpcClientEngine engine = getProtocolEngine(protocol, conf); - VersionedProtocol proxy = engine + T proxy = engine .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, getRpcTimeout())); return proxy; @@ -259,8 +258,8 @@ public class HBaseClientRPC { * @return a proxy instance * @throws java.io.IOException e */ - public static VersionedProtocol getProxy( - Class protocol, + public static T getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java index 46873ab..2038171 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java @@ -48,14 +48,14 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { protected final static ClientCache CLIENTS = new ClientCache(); @Override - public VersionedProtocol getProxy( - Class protocol, long clientVersion, + public T getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout); - return (VersionedProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[]{protocol}, invoker); + return protocol.cast(Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtocolMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtocolMap.java new file mode 100644 index 0000000..274ee80 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtocolMap.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; + +/** + * A special map from a protocol to a proxy of the protocol + * with providing the same proxy for equal protocols, + * where a protocol is represented by an instance of {@code Class}, + * and a proxy is represented by an instance of {@code VersionedProtocol}. + *

+ * Thread safe. + */ +public class ProtocolMap { + /** + * Provides a proxy for {@code T}. + * This is invoked by a single thread. + */ + public interface ProxyProvider { + T get() throws IOException; + } + + private final ConcurrentMap, ProxyHolder> holderMap = + new ConcurrentHashMap, ProxyHolder>(); + + private static class ProxyHolder { + final CountDownLatch initLatch = new CountDownLatch(1); + T proxy; + + T init(ProxyProvider provider) throws IOException { + try { + return this.proxy = provider.get(); + } finally { + initLatch.countDown(); + } + } + + /** + * @return null if its initialization is failed + */ + T get() { + boolean interrupted = false; + try { + while (true) { + try { + initLatch.await(); + return proxy; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + } + + /** + * Returns a proxy for the given {@code protocol}. + * For a new protocol, this will get a new proxy from the given {@code provider}. + * + * @throws NullPointerException if {@code protocol} or {@code provider} is null + * @throws IOException thrown by the given {@code provider} + */ + public T get( + Class protocol, ProxyProvider provider) throws IOException { + + if (protocol == null) { + throw new NullPointerException("protocol"); + } + if (provider == null) { + throw new NullPointerException("provider"); + } + + ProxyHolder holder = holderMap.get(protocol); + if (holder != null) { + VersionedProtocol proxy = holder.get(); + if (proxy != null) { + return protocol.cast(proxy); + } + holderMap.remove(protocol, holder); + } + + ProxyHolder newHolder = new ProxyHolder(); + while (true) { + ProxyHolder existingHoder = holderMap.putIfAbsent(protocol, newHolder); + if (existingHoder == null) { + try { + return newHolder.init(provider); + } catch (IOException e) { + holderMap.remove(protocol, newHolder); + throw e; + } + } + + VersionedProtocol existingProxy = existingHoder.get(); + if (existingProxy != null) { + return protocol.cast(existingProxy); + } + holderMap.remove(protocol, existingHoder); + } + } + + /** + * Returns the proxies which has been given by {@link #get(Class, ProxyProvider)}. + * The returned collection is not a view, and changing the collection causes no effect. + */ + public Collection values() { + List result = new ArrayList(); + for (ProxyHolder holder : holderMap.values()) { + VersionedProtocol proxy = holder.get(); + if (proxy != null) { + result.add(proxy); + } + } + return result; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java index f6dcbf9..8d1ce11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java @@ -31,7 +31,7 @@ import java.net.InetSocketAddress; @InterfaceAudience.Private public interface RpcClientEngine { /** Construct a client-side proxy object. */ - VersionedProtocol getProxy(Class protocol, + T getProxy(Class protocol, long clientVersion, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index dc12b7e..fb7e02b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1776,7 +1776,7 @@ public class HRegionServer implements ClientProtocol, try { // Do initial RPC setup. The final argument indicates that the RPC // should retry indefinitely. - master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy( + master = HBaseClientRPC.waitForProxy( RegionServerStatusProtocol.class, RegionServerStatusProtocol.VERSION, isa, this.conf, -1, this.rpcTimeout, this.rpcTimeout); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java index ea915c5..d6bf843 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java @@ -46,9 +46,10 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { private static final Random RANDOM = new Random(System.currentTimeMillis()); public static double chanceOfTimeout = 0.3; private static AtomicInteger invokations = new AtomicInteger(); - - public VersionedProtocol getProxy( - Class protocol, long clientVersion, + + @Override + public T getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { // Start up the requested-for proxy so we can pass-through calls to the underlying @@ -58,8 +59,8 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { ticket, conf, factory, rpcTimeout); RandomTimeoutInvocationHandler invoker = new RandomTimeoutInvocationHandler(actualProxy); - VersionedProtocol object = (VersionedProtocol)Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[]{protocol}, invoker); + T object = protocol.cast(Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker)); return object; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index a4ec061..9dfa29b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -75,7 +75,7 @@ public class TestDelayedRpc { isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0, + TestRpc client = HBaseClientRPC.getProxy(TestRpc.class, 0, rpcServer.getListenerAddress(), conf, 1000); List results = new ArrayList(); @@ -137,7 +137,7 @@ public class TestDelayedRpc { new Class[]{ TestRpcImpl.class }, isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0, + TestRpc client = HBaseClientRPC.getProxy(TestRpc.class, 0, rpcServer.getListenerAddress(), conf, 1000); Thread threads[] = new Thread[MAX_DELAYED_RPC + 1]; @@ -269,7 +269,7 @@ public class TestDelayedRpc { isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0, + TestRpc client = HBaseClientRPC.getProxy(TestRpc.class, 0, rpcServer.getListenerAddress(), conf, 1000); int result = 0xDEADBEEF; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 0d8684e..7c45873 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -120,8 +120,7 @@ public class TestProtoBufRpc { HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class); HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class); - return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class, 0, - addr, conf, 10000); + return HBaseClientRPC.getProxy(TestRpcService.class, 0, addr, conf, 10000); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolMap.java new file mode 100644 index 0000000..54d4acd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolMap.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestProtocolMap { + + private static interface MyProtocol extends VersionedProtocol {} + private static interface MyProtocol2 extends VersionedProtocol {} + private static interface MyProtocol3 extends VersionedProtocol {} + + private static abstract class AbstractProtocol implements VersionedProtocol { + @Override + @Deprecated + public long getProtocolVersion(String protocol, long clientVersion) { + throw new UnsupportedOperationException(); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + throw new UnsupportedOperationException(); + } + } + + private static class MyProtocolImpl extends AbstractProtocol implements MyProtocol {} + private static class MyProtocol2Impl extends AbstractProtocol implements MyProtocol2 {} + private static class MyProtocol3Impl extends AbstractProtocol implements MyProtocol3 {} + + private static class ProxyProviderWithCallCounter + implements ProtocolMap.ProxyProvider { + + final AtomicInteger callCount = new AtomicInteger(); + + @Override + public MyProtocol get() { + callCount.incrementAndGet(); + return new MyProtocolImpl(); + } + } + + private static class MyIOException extends IOException {} + + private enum ProxyProviderToThrowMyIOException + implements ProtocolMap.ProxyProvider { + INSTANCE; + + @Override + public MyProtocol get() throws IOException { + throw new MyIOException(); + } + } + + private static class SingletonProxyProvider + implements ProtocolMap.ProxyProvider { + + final T singleton; + + SingletonProxyProvider(T singleton) { + this.singleton = singleton; + } + + @Override + public T get() { + return singleton; + } + } + + @Test + public void testGet() throws Exception { + ProtocolMap map = new ProtocolMap(); + ProxyProviderWithCallCounter provider = new ProxyProviderWithCallCounter(); + + MyProtocol p = map.get(MyProtocol.class, provider); + Assert.assertTrue(p instanceof MyProtocolImpl); + Assert.assertEquals(1, provider.callCount.get()); + + MyProtocol p2 = map.get(MyProtocol.class, provider); + Assert.assertSame(p, p2); + Assert.assertEquals(1, provider.callCount.get()); + } + + @Test(expected=MyIOException.class) + public void testGetThrowsException() throws Exception { + ProtocolMap map = new ProtocolMap(); + map.get(MyProtocol.class, ProxyProviderToThrowMyIOException.INSTANCE); + } + + @Test + public void testValues() throws Exception { + ProtocolMap map = new ProtocolMap(); + + MyProtocol p = new MyProtocolImpl(); + MyProtocol2 p2 = new MyProtocol2Impl(); + MyProtocol3 p3 = new MyProtocol3Impl(); + + map.get(MyProtocol.class, new SingletonProxyProvider(p)); + map.get(MyProtocol2.class, new SingletonProxyProvider(p2)); + map.get(MyProtocol3.class, new SingletonProxyProvider(p3)); + + Collection actual = map.values(); + + Assert.assertEquals( + new HashSet(Arrays.asList(p, p2, p3)), + new HashSet(actual)); + } + + @Test + public void testContention() throws Exception { + final int concurrentLevel = 100; + final CyclicBarrier readyBarrier = new CyclicBarrier(concurrentLevel + 1); + final ProtocolMap map = new ProtocolMap(); + final ProxyProviderWithCallCounter provider = new ProxyProviderWithCallCounter(); + + ExecutorService service = Executors.newCachedThreadPool(); + List> futures = new ArrayList>(); + + for (int i = 0; i < concurrentLevel; i++) { + futures.add(service.submit(new Callable() { + @Override + public MyProtocol call() throws Exception { + readyBarrier.await(); + return map.get(MyProtocol.class, provider); + } + })); + } + + service.shutdown(); + readyBarrier.await(); + + if (! service.awaitTermination(100, TimeUnit.MILLISECONDS)) { + service.shutdownNow(); + Assert.fail("100msec elapsed before termination"); + } + + Set resultSet = new HashSet(); + for (Future future : futures) { + resultSet.add(future.get()); + } + + Assert.assertEquals(1, resultSet.size()); + Assert.assertEquals(1, provider.callCount.get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java index 17c4861..419fb04 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java @@ -55,7 +55,7 @@ public class TestHMasterRPCException { //try to connect too soon. Retry on SocketTimeoutException. while (i < 20) { try { - MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy( + MasterMonitorProtocol inf = HBaseClientRPC.getProxy( MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100 * 10); inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance()); fail(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 317b346..f30098a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -377,7 +377,6 @@ public class TestTokenAuthentication { Configuration c = server.getConfiguration(); c.set(HConstants.CLUSTER_ID, clusterId.toString()); AuthenticationProtos.AuthenticationService.BlockingInterface proxy = - (AuthenticationProtos.AuthenticationService.BlockingInterface) HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class, BlockingAuthenticationService.VERSION, server.getAddress(), c,