diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 27c70bc..982a2fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -1688,7 +1688,8 @@ public class RpcClient { * @return A blocking rpc channel that goes via this rpc client instance. */ public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, - final User ticket, final int rpcTimeout) { + final User ticket, final int rpcTimeout) + throws IOException { return new BlockingRpcChannelImplementation(this, sn, ticket, rpcTimeout); } @@ -1703,8 +1704,12 @@ public class RpcClient { private final User ticket; protected BlockingRpcChannelImplementation(final RpcClient rpcClient, final ServerName sn, - final User ticket, final int rpcTimeout) { + final User ticket, final int rpcTimeout) + throws IOException { this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); + if (this.isa.isUnresolved()) { + throw new IOException("unknown host: " + sn.getHostname()); + } this.rpcClient = rpcClient; // Set the rpc timeout to be the minimum of configured timeout and whatever the current // thread local setting is. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientRpcClientFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientRpcClientFail.java new file mode 100644 index 0000000..c5024ae --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientRpcClientFail.java @@ -0,0 +1,199 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.BlockingRpcChannel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category(MediumTests.class) +public class TestClientRpcClientFail { + final Log LOG = LogFactory.getLog(getClass()); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Random RANDOM = new Random(System.currentTimeMillis()); + private static long LAST_LIMIT = HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT; + protected static int SLAVES = 1; + + /** + * @throws Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test that a client that random fails an RPC to the master retries properly and doesn't + * throw any unexpected exceptions. + */ + @Test + public void testAdminRandomRpcFail() throws Exception { + HConnection lastConnection = null; + int initialInvocations = RandomFailBlockingRpcChannel.invocations.get(); + RpcClient rpcClient = newRandomFailRpcClient(); + try { + for (int i = 0; i < 10; ++i) { + // Ensure the HBaseAdmin uses a new connection by changing Configuration. + Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++LAST_LIMIT); + HBaseAdmin admin = null; + try { + admin = new HBaseAdmin(conf); + HConnection connection = admin.getConnection(); + assertFalse(connection == lastConnection); + lastConnection = connection; + // Override the connection's rpc client for failure testing + RpcClient oldRpcClient = + ((HConnectionManager.HConnectionImplementation) connection).setRpcClient(rpcClient); + if (oldRpcClient != null) { + oldRpcClient.stop(); + } + // run some admin commands + HBaseAdmin.checkHBaseAvailable(conf); + admin.setBalancerRunning(false, false); + } finally { + if (admin != null) { + admin.close(); + if (admin.getConnection().isClosed()) { + rpcClient = newRandomFailRpcClient(); + } + } + } + } + // Ensure the RandomFailRpcEngine is actually being used. + assertTrue(RandomFailBlockingRpcChannel.invocations.get() > initialInvocations); + } finally { + rpcClient.stop(); + } + } + + /** + * Test that a client random fails an RPC to the RS retries properly and doesn't throw any + * unexpected exceptions. + */ + @Test + public void testRegionServerRandomRpcFail() throws Exception { + TableName testTableName = TableName.valueOf("testRegionServerRandomRpcFail"); + HTableDescriptor testTableDescriptor = new HTableDescriptor(testTableName); + testTableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("a"))); + TEST_UTIL.getHBaseAdmin().createTable(testTableDescriptor); + + HConnection lastConnection = null; + int initialInvocations = RandomFailBlockingRpcChannel.invocations.get(); + RpcClient rpcClient = newRandomFailRpcClient(); + try { + for (int i = 0; i < 10; ++i) { + // Ensure the HTable uses a new connection by changing Configuration. + Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++LAST_LIMIT); + HTable htable = null; + try { + htable = new HTable(conf, testTableName); + HConnection connection = htable.getConnection(); + assertFalse(connection == lastConnection); + lastConnection = connection; + // Override the connection's rpc client for failure testing + RpcClient oldRpcClient = + ((HConnectionManager.HConnectionImplementation) connection).setRpcClient(rpcClient); + if (oldRpcClient != null) { + oldRpcClient.stop(); + } + // run some region server commands + // always do put first + if (i == 0 || RANDOM.nextDouble() < 0.5) { + Put put = new Put(Bytes.toBytes("row")); + put.add(Bytes.toBytes("a"), Bytes.toBytes("a"), Bytes.toBytes(RANDOM.nextInt())); + htable.put(put); + } else { + Get get = new Get(Bytes.toBytes("row")); + get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("a")); + htable.get(get); + } + } finally { + if (htable != null) { + htable.close(); + if (htable.getConnection().isClosed()) { + rpcClient = newRandomFailRpcClient(); + } + } + } + } + // Ensure the RandomFailRpcEngine is actually being used. + assertTrue(RandomFailBlockingRpcChannel.invocations.get() > initialInvocations); + } finally { + rpcClient.stop(); + } + } + + private static RpcClient newRandomFailRpcClient() { + return new RpcClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()) { + // Return my own instance, one that does random failures + @Override + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, + User ticket, int rpcTimeout) + throws IOException { + return new RandomFailBlockingRpcChannel(this, sn, ticket, rpcTimeout); + } + }; + } + + /** + * Blocking rpc channel that goes via hbase rpc which will random fail. + */ + static class RandomFailBlockingRpcChannel extends RpcClient.BlockingRpcChannelImplementation { + private static final Log LOG = LogFactory.getLog(RandomFailBlockingRpcChannel.class); + private static final Random RANDOM = new Random(System.currentTimeMillis()); + private static final double CHANCE_OF_FAIL = 0.5; + private static AtomicInteger invocations = new AtomicInteger(); + + RandomFailBlockingRpcChannel(final RpcClient rpcClient, final ServerName sn, + final User ticket, final int rpcTimeout) + throws IOException { + super(rpcClient, sn, ticket, rpcTimeout); + invocations.getAndIncrement(); + if (RANDOM.nextFloat() < CHANCE_OF_FAIL) { + LOG.error("rpc channel fake failure"); + throw new IOException("rpc channel fake failure"); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 998f7ad..da39172 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.net.SocketTimeoutException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -129,7 +130,8 @@ public class TestClientTimeouts { // Return my own instance, one that does random timeouts @Override public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, - User ticket, int rpcTimeout) { + User ticket, int rpcTimeout) + throws IOException { return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout); } }; @@ -144,7 +146,8 @@ public class TestClientTimeouts { private static AtomicInteger invokations = new AtomicInteger(); RandomTimeoutBlockingRpcChannel(final RpcClient rpcClient, final ServerName sn, - final User ticket, final int rpcTimeout) { + final User ticket, final int rpcTimeout) + throws IOException { super(rpcClient, sn, ticket, rpcTimeout); }