From 4b140d2b45937c17aca20c43732e82a563ecdc37 Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Wed, 30 Jan 2019 13:19:07 -0800 Subject: [PATCH] HBASE-21775 ADDENDUM - fix TestAsyncProcess --- .../hadoop/hbase/client/TestAsyncProcess.java | 47 ++++++++++++---------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index ae30344..a02eba9 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -68,10 +68,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.*; import org.junit.experimental.categories.Category; import org.mockito.Mockito; import org.slf4j.Logger; @@ -91,9 +88,8 @@ public class TestAsyncProcess { private static final byte[] DUMMY_BYTES_2 = Bytes.toBytes("DUMMY_BYTES_2"); private static final byte[] DUMMY_BYTES_3 = Bytes.toBytes("DUMMY_BYTES_3"); private static final byte[] FAILS = Bytes.toBytes("FAILS"); - private static final Configuration CONF = new Configuration(); - private static final ConnectionConfiguration CONNECTION_CONFIG = - new ConnectionConfiguration(CONF); + private Configuration CONF; + private ConnectionConfiguration CONNECTION_CONFIG; private static final ServerName sn = ServerName.valueOf("s1,1,1"); private static final ServerName sn2 = ServerName.valueOf("s2,2,2"); private static final ServerName sn3 = ServerName.valueOf("s3,3,3"); @@ -123,13 +119,18 @@ public class TestAsyncProcess { private static final int NB_RETRIES = 3; - private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - @BeforeClass - public static void beforeClass(){ + private int RPC_TIMEOUT; + private int OPERATION_TIMEOUT; + + @Before + public void beforeEach() { + this.CONF = new Configuration(); CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); + this.CONNECTION_CONFIG = new ConnectionConfiguration(CONF); + this.RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); } static class CountingThreadFactory implements ThreadFactory { @@ -151,6 +152,7 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List allReqs = new ArrayList<>(); public AtomicInteger callsCt = new AtomicInteger(); + private Configuration conf; private long previousTimeout = -1; final ExecutorService service; @@ -174,6 +176,7 @@ public class TestAsyncProcess { super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); service = Executors.newFixedThreadPool(5); + this.conf = conf; } public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { @@ -191,8 +194,8 @@ public class TestAsyncProcess { .setRowAccess(rows) .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL) .setNeedResults(needResults) - .setRpcTimeout(RPC_TIMEOUT) - .setOperationTimeout(OPERATION_TIMEOUT) + .setRpcTimeout(conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)) + .setOperationTimeout(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)) .build(); return submit(task); } @@ -502,8 +505,8 @@ public class TestAsyncProcess { List hrl; final boolean usedRegions[]; - protected MyConnectionImpl2(List hrl) throws IOException { - super(CONF); + protected MyConnectionImpl2(List hrl, Configuration conf) throws IOException { + super(conf); this.hrl = hrl; this.usedRegions = new boolean[hrl.size()]; } @@ -1030,7 +1033,7 @@ public class TestAsyncProcess { } } - private static ClusterConnection createHConnection() throws IOException { + private ClusterConnection createHConnection() throws IOException { ClusterConnection hc = createHConnectionCommon(); setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); @@ -1041,7 +1044,7 @@ public class TestAsyncProcess { return hc; } - private static ClusterConnection createHConnectionWithReplicas() throws IOException { + private ClusterConnection createHConnectionWithReplicas() throws IOException { ClusterConnection hc = createHConnectionCommon(); setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); @@ -1069,7 +1072,7 @@ public class TestAsyncProcess { Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); } - private static ClusterConnection createHConnectionCommon() { + private ClusterConnection createHConnectionCommon() { ClusterConnection hc = Mockito.mock(ClusterConnection.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); @@ -1424,7 +1427,7 @@ public class TestAsyncProcess { gets.add(get); } - MyConnectionImpl2 con = new MyConnectionImpl2(hrls); + MyConnectionImpl2 con = new MyConnectionImpl2(hrls, CONF); MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads); HTable ht = (HTable) con.getTable(DUMMY_TABLE, ap.service); ht.multiAp = ap; @@ -1611,7 +1614,7 @@ public class TestAsyncProcess { return ap; } - private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, + private BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) { return new BufferedMutatorParams(name) .pool(ap.service) -- 2.7.4