From ba2db9aacd68c8e38b1be466c374d3b91c482030 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Wed, 27 Dec 2017 17:39:52 +0800 Subject: [PATCH] HBASE-19643 Need to update cache location when get error in AsyncBatchRpcRetryingCaller --- .../hbase/client/AsyncBatchRpcRetryingCaller.java | 3 +++ .../hadoop/hbase/client/TestAsyncTableBatch.java | 26 ++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7249435..8d59130 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -262,6 +262,7 @@ class AsyncBatchRpcRetryingCaller { } else if (result instanceof Throwable) { Throwable error = translateException((Throwable) result); logException(tries, () -> Stream.of(regionReq), error, serverName); + conn.getLocator().updateCachedLocation(regionReq.loc, error); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), getExtraContextForError(serverName)); @@ -364,6 +365,8 @@ class AsyncBatchRpcRetryingCaller { ServerName serverName) { Throwable error = translateException(t); logException(tries, () -> actionsByRegion.values().stream(), error, serverName); + actionsByRegion + .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error)); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, serverName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 489ad1d..f47e6e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -179,6 +180,31 @@ public class TestAsyncTableBatch { } @Test + public void testWithRegionServerFailover() throws Exception { + AsyncTable table = tableGetter.apply(TABLE_NAME); + table.putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); + Thread.sleep(100); + table.putAll(IntStream.range(COUNT, 2 * COUNT) + .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) + .collect(Collectors.toList())).get(); + List results = table.getAll( + IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) + .get(); + assertEquals(2 * COUNT, results.size()); + results.forEach(r -> assertFalse(r.isEmpty())); + table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) + .collect(Collectors.toList())).get(); + results = table.getAll( + IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) + .get(); + assertEquals(2 * COUNT, results.size()); + results.forEach(r -> assertTrue(r.isEmpty())); + } + + @Test public void testMixed() throws InterruptedException, ExecutionException, IOException { AsyncTable table = tableGetter.apply(TABLE_NAME); table.putAll(IntStream.range(0, 7) -- 1.9.1