commit 83a4e7169b4a045598ab8cda3f3d5cbd02d5e0c3 Author: Enis Soztutar Date: Thu Oct 1 15:34:32 2015 -0700 HBASE-14535 v1 diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcWithChaosMonkey.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcWithChaosMonkey.java new file mode 100644 index 0000000..b77ea7c --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcWithChaosMonkey.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.protobuf.ServiceException; +import com.google.protobuf.Descriptors.MethodDescriptor; + +/** + * A non-deterministic test to simulate a multi-threaded client / server cluster setup. Tries to + * concurrency / deadlock issues in RpcClient. Starts 10 rpc servers and 30 clients and a thread + * to stop the rpc servers or start new rpc servers. If the test timesout, it may indicate that + * there is a deadlock or leak happening. + * + * Due to the non-deterministic nature of the test, it is advisable to run this test 10-20 times + * to see whether any of the runs go into a timeout. + */ +@RunWith(Parameterized.class) +@Category({ MediumTests.class }) +public class TestRpcWithChaosMonkey extends AbstractTestIPC { + + private static final Log LOG = LogFactory.getLog(TestRpcWithChaosMonkey.class); + + private boolean isSyncClient; + + @Parameters + public static Collection parameters() { + List paramList = new ArrayList(); + paramList.add(new Object[] { true}); + // paramList.add(new Object[] { false}); TODO: async test fails + return paramList; + } + + public TestRpcWithChaosMonkey(boolean isSyncClient) { + this.isSyncClient = isSyncClient; + } + + @Override + protected AbstractRpcClient createRpcClient(Configuration conf) { + return isSyncClient ? + new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) : + new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) { + @Override + Codec getCodec() { + return null; + } + }; + } + + static class Cluster { + Random random = new Random(); + ReadWriteLock lock = new ReentrantReadWriteLock(); + HashMap rpcServers = new HashMap<>(); + List serverList = new ArrayList<>(); + int maxServers; + int minServers; + + Cluster(int minServers, int maxServers) { + this.minServers = minServers; + this.maxServers = maxServers; + } + + TestRpcServer startServer() throws IOException { + lock.writeLock().lock(); + try { + if (rpcServers.size() >= maxServers) { + return null; + } + + TestRpcServer rpcServer = new TestRpcServer(); + rpcServer.start(); + rpcServers.put(rpcServer.getListenerAddress(), rpcServer); + serverList.add(rpcServer); + LOG.info("Started server: " + rpcServer.getListenerAddress()); + return rpcServer; + } finally { + lock.writeLock().unlock(); + } + } + + void stopRandomServer() throws Exception { + lock.writeLock().lock(); + TestRpcServer rpcServer = null; + try { + if (rpcServers.size() <= minServers) { + return; + } + int size = rpcServers.size(); + int rand = random.nextInt(size); + rpcServer = serverList.remove(rand); + rpcServers.remove(rpcServer.getListenerAddress()); + } finally { + lock.writeLock().unlock(); + } + + if (rpcServer != null) { + stopServer(rpcServer); + } + } + + void stopServer(TestRpcServer rpcServer) throws InterruptedException { + LOG.info("Stopping server: " + rpcServer.getListenerAddress()); + rpcServer.stop(); + rpcServer.join(); + LOG.info("Stopped server: " + rpcServer.getListenerAddress()); + } + + void stopRunning() throws InterruptedException { + lock.writeLock().lock(); + try { + for (TestRpcServer rpcServer : serverList) { + stopServer(rpcServer); + } + + } finally { + lock.writeLock().unlock(); + } + } + + TestRpcServer getRandomServer() { + lock.readLock().lock();; + try { + int size = rpcServers.size(); + int rand = random.nextInt(size); + return serverList.get(rand); + } finally { + lock.readLock().unlock(); + } + } + } + + static class MiniChaosMonkey extends Thread { + AtomicBoolean running = new AtomicBoolean(true); + Random random = new Random(); + AtomicReference exception = new AtomicReference<>(null); + Cluster cluster; + + public MiniChaosMonkey(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public void run() { + while (running.get()) { + switch (random.nextInt() % 2) { + case 0: //start a server + try { + cluster.startServer(); + } catch (Exception e) { + exception.compareAndSet(null, e); + } + break; + + case 1: // stop a server + try { + cluster.stopRandomServer(); + } catch (Exception e) { + exception.compareAndSet(null, e); + } + default: + } + + Threads.sleep(100); + } + } + + void stopRunning() { + running.set(false); + } + + void rethrowException() throws Exception { + if (exception.get() != null) { + throw exception.get(); + } + } + } + + static class SimpleClient extends Thread { + AbstractRpcClient rpcClient; + AtomicBoolean running = new AtomicBoolean(true); + AtomicReference exception = new AtomicReference<>(null); + Cluster cluster; + String id; + long numCalls = 0; + + public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) { + this.cluster = cluster; + this.rpcClient = rpcClient; + this.id = id; + } + + @Override + public void run() { + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + + while (running.get()) { + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(id + numCalls).build(); + EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build(); + + TestRpcServer server = cluster.getRandomServer(); + try { + User user = User.getCurrent(); + ret = (EchoResponseProto) + rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress()); + } catch (Exception e) { + LOG.warn(e); + continue; // expected in case connection is closing or closed + } + + try { + assertNotNull(ret); + assertEquals(id + numCalls, ret.getMessage()); + } catch (Throwable t) { + exception.compareAndSet(null, t); + } + + numCalls++; + } + } + + void stopRunning() { + running.set(false); + } + + void rethrowException() throws Throwable { + if (exception.get() != null) { + throw exception.get(); + } + } + + } + + @Test (timeout = 90000) + public void testRpcWithChaosMonkey() throws Throwable { + Configuration conf = HBaseConfiguration.create(); + Cluster cluster = new Cluster(10, 100); + for (int i = 0; i < 10; i++) { + cluster.startServer(); + } + + ArrayList clients = new ArrayList<>(); + + // all threads should share the same rpc client + AbstractRpcClient rpcClient = createRpcClient(conf); + + for (int i = 0; i < 30; i++) { + String clientId = "client_" + i + "_"; + LOG.info("Starting client: " + clientId); + SimpleClient client = new SimpleClient(cluster, rpcClient, clientId); + client.start(); + clients.add(client); + } + + LOG.info("Starting MiniChaosMonkey"); + MiniChaosMonkey cm = new MiniChaosMonkey(cluster); + cm.start(); + + Threads.sleep(30000); + + LOG.info("Stopping MiniChaosMonkey"); + cm.stopRunning(); + cm.join(); + cm.rethrowException(); + + LOG.info("Stopping clients"); + for (SimpleClient client : clients) { + LOG.info("Stopping client: " + client.id); + System.out.println(client.id + " numCalls:" + client.numCalls); + client.stopRunning(); + client.join(); + client.rethrowException(); + assertTrue(client.numCalls > 10); + } + + LOG.info("Stopping RpcClient"); + rpcClient.close(); + + LOG.info("Stopping Cluster"); + cluster.stopRunning(); + } + + // below not used + + @Override + protected AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) + throws IOException { + return null; + } + + @Override + protected AbstractRpcClient createRpcClientNoCodec(Configuration conf) { + return null; + } + + @Override + public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, + NoSuchMethodException, ServiceException { + } + + @Override + public void testNoCodec() throws InterruptedException, IOException { + } + + @Override + public void testRpcScheduler() throws IOException, InterruptedException { + } + + @Override + public void testRTEDuringConnectionSetup() throws Exception { + } + +}