diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index d2423b3..e6f6f46 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -196,6 +196,12 @@ public class HTable implements Table { cleanupConnectionOnClose = false; // used from tests, don't trust the connection is real this.mutator = new BufferedMutatorImpl(conn, null, null, params); + this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } /** @@ -453,7 +459,7 @@ public class HTable implements Table { } try { Object[] r1 = new Object[gets.size()]; - batch((List)gets, r1); + batch((List)gets, r1, readRpcTimeout); // Translate. Result [] results = new Result[r1.length]; int i = 0; @@ -480,6 +486,15 @@ public class HTable implements Table { } } + public void batch(final List actions, final Object[] results, int timeout) + throws InterruptedException, IOException { + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw ars.getErrors(); + } + } + /** * {@inheritDoc} */ @@ -529,7 +544,7 @@ public class HTable implements Table { throws IOException { Object[] results = new Object[deletes.size()]; try { - batch(deletes, results); + batch(deletes, results, writeRpcTimeout); } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException().initCause(e); } finally { @@ -866,7 +881,7 @@ public class HTable implements Table { Object[] r1= new Object[exists.size()]; try { - batch(exists, r1); + batch(exists, r1, readRpcTimeout); } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException().initCause(e); } @@ -910,17 +925,6 @@ public class HTable implements Table { this.batchCallback(list, results, callback); } - - /** - * Parameterized batch processing, allowing varying return types for different - * {@link Row} implementations. - */ - public void processBatch(final List list, final Object[] results) - throws IOException, InterruptedException { - this.batch(list, results); - } - - @Override public void close() throws IOException { if (this.closed) { diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index bcc052d..656dcfc 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -155,7 +155,7 @@ public class TestAsyncProcess { public List allReqs = new ArrayList(); public AtomicInteger callsCt = new AtomicInteger(); private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - + private long previousTimeout = -1; @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, @@ -210,7 +210,13 @@ public class TestAsyncProcess { // We use results in tests to check things, so override to always save them. return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } - + @Override + public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, + List rows, Batch.Callback callback, Object[] results, + CancellableRegionServerCallable callable, int curTimeout) { + previousTimeout = curTimeout; + return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout); + } @Override protected void updateStats(ServerName server, Map results) { @@ -1273,7 +1279,7 @@ public class TestAsyncProcess { Object[] res = new Object[puts.size()]; try { - ht.processBatch(puts, res); + ht.batch(puts, res); Assert.fail(); } catch (RetriesExhaustedException expected) { } @@ -1314,6 +1320,46 @@ public class TestAsyncProcess { } @Test + public void testReadAndWriteTimeout() throws IOException { + final long readTimeout = 10 * 1000; + final long writeTimeout = 20 * 1000; + Configuration copyConf = new Configuration(conf); + copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout); + copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout); + ClusterConnection conn = createHConnection(); + Mockito.when(conn.getConfiguration()).thenReturn(copyConf); + BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE); + try (HTable ht = new HTable(conn, bufferParam)) { + MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true); + ht.multiAp = ap; + List gets = new LinkedList<>(); + gets.add(new Get(DUMMY_BYTES_1)); + gets.add(new Get(DUMMY_BYTES_2)); + try { + ht.get(gets); + } catch (ClassCastException e) { + // No result response on this test. + } + assertEquals(readTimeout, ap.previousTimeout); + ap.previousTimeout = -1; + + try { + ht.existsAll(gets); + } catch (ClassCastException e) { + // No result response on this test. + } + assertEquals(readTimeout, ap.previousTimeout); + ap.previousTimeout = -1; + + List deletes = new LinkedList<>(); + deletes.add(new Delete(DUMMY_BYTES_1)); + deletes.add(new Delete(DUMMY_BYTES_2)); + ht.delete(deletes); + assertEquals(writeTimeout, ap.previousTimeout); + } + } + + @Test public void testGlobalErrors() throws IOException { ClusterConnection conn = new MyConnectionImpl(conf); BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index a577748..bdf88af 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -42,5 +42,5 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment { RegionServerServices getRegionServerServices(); /** @return shared data between all instances of this coprocessor */ - ConcurrentMap getSharedData(); + ConcurrentMap getSharedData(); }