From efdc91f27e7e3ba163256caf910812bd797dfa26 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 31 Dec 2018 16:43:26 +0800 Subject: [PATCH] HBASE-17356 Add replica get support --- .../apache/hadoop/hbase/RegionLocations.java | 30 +- .../client/AsyncBatchRpcRetryingCaller.java | 114 ++++--- .../client/AsyncConnectionConfiguration.java | 12 + .../hbase/client/AsyncConnectionImpl.java | 1 - .../hbase/client/AsyncMetaRegionLocator.java | 125 +++++--- .../client/AsyncNonMetaRegionLocator.java | 291 ++++++++++-------- .../hbase/client/AsyncRegionLocator.java | 129 ++++---- .../client/AsyncRegionLocatorHelper.java | 147 +++++++++ .../hbase/client/AsyncRpcRetryingCaller.java | 15 +- .../client/AsyncRpcRetryingCallerFactory.java | 55 ++-- .../AsyncSingleRequestRpcRetryingCaller.java | 71 +++-- .../hbase/client/AsyncTableRegionLocator.java | 28 +- .../client/AsyncTableRegionLocatorImpl.java | 6 +- .../hbase/client/ConnectionConfiguration.java | 5 +- .../hbase/client/RawAsyncTableImpl.java | 208 +++++++++---- .../apache/hadoop/hbase/util/FutureUtils.java | 60 ++++ .../hbase/client/RegionReplicaTestHelper.java | 161 ++++++++++ .../client/TestAsyncMetaRegionLocator.java | 55 +--- .../client/TestAsyncNonMetaRegionLocator.java | 126 +++++--- ...ncNonMetaRegionLocatorConcurrenyLimit.java | 20 +- ...stAsyncSingleRequestRpcRetryingCaller.java | 56 ++-- .../client/TestAsyncTableLocatePrefetch.java | 4 +- .../TestAsyncTableRegionReplicasGet.java | 204 ++++++++++++ .../hbase/client/TestZKAsyncRegistry.java | 44 +-- 24 files changed, 1362 insertions(+), 605 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java index fd6f3c7e71..f98bf03b79 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java @@ -56,8 +56,8 @@ public class RegionLocations { int index = 0; for (HRegionLocation loc : locations) { if (loc != null) { - if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) { - maxReplicaId = loc.getRegionInfo().getReplicaId(); + if (loc.getRegion().getReplicaId() >= maxReplicaId) { + maxReplicaId = loc.getRegion().getReplicaId(); maxReplicaIdIndex = index; } } @@ -72,7 +72,7 @@ public class RegionLocations { this.locations = new HRegionLocation[maxReplicaId + 1]; for (HRegionLocation loc : locations) { if (loc != null) { - this.locations[loc.getRegionInfo().getReplicaId()] = loc; + this.locations[loc.getRegion().getReplicaId()] = loc; } } } @@ -146,7 +146,7 @@ public class RegionLocations { public RegionLocations remove(HRegionLocation location) { if (location == null) return this; if (location.getRegion() == null) return this; - int replicaId = location.getRegionInfo().getReplicaId(); + int replicaId = location.getRegion().getReplicaId(); if (replicaId >= locations.length) return this; // check whether something to remove. HRL.compareTo() compares ONLY the @@ -203,14 +203,14 @@ public class RegionLocations { // in case of region replication going down, we might have a leak here. int max = other.locations.length; - HRegionInfo regionInfo = null; + RegionInfo regionInfo = null; for (int i = 0; i < max; i++) { HRegionLocation thisLoc = this.getRegionLocation(i); HRegionLocation otherLoc = other.getRegionLocation(i); - if (regionInfo == null && otherLoc != null && otherLoc.getRegionInfo() != null) { + if (regionInfo == null && otherLoc != null && otherLoc.getRegion() != null) { // regionInfo is the first non-null HRI from other RegionLocations. We use it to ensure that // all replica region infos belong to the same region with same region id. - regionInfo = otherLoc.getRegionInfo(); + regionInfo = otherLoc.getRegion(); } HRegionLocation selectedLoc = selectRegionLocation(thisLoc, @@ -232,7 +232,7 @@ public class RegionLocations { for (int i=0; i < newLocations.length; i++) { if (newLocations[i] != null) { if (!RegionReplicaUtil.isReplicasForSameRegion(regionInfo, - newLocations[i].getRegionInfo())) { + newLocations[i].getRegion())) { newLocations[i] = null; } } @@ -273,9 +273,9 @@ public class RegionLocations { boolean checkForEquals, boolean force) { assert location != null; - int replicaId = location.getRegionInfo().getReplicaId(); + int replicaId = location.getRegion().getReplicaId(); - HRegionLocation oldLoc = getRegionLocation(location.getRegionInfo().getReplicaId()); + HRegionLocation oldLoc = getRegionLocation(location.getRegion().getReplicaId()); HRegionLocation selectedLoc = selectRegionLocation(oldLoc, location, checkForEquals, force); @@ -288,8 +288,8 @@ public class RegionLocations { // ensure that all replicas share the same start code. Otherwise delete them for (int i=0; i < newLocations.length; i++) { if (newLocations[i] != null) { - if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegionInfo(), - newLocations[i].getRegionInfo())) { + if (!RegionReplicaUtil.isReplicasForSameRegion(location.getRegion(), + newLocations[i].getRegion())) { newLocations[i] = null; } } @@ -317,8 +317,8 @@ public class RegionLocations { public HRegionLocation getRegionLocationByRegionName(byte[] regionName) { for (HRegionLocation loc : locations) { if (loc != null) { - if (Bytes.equals(loc.getRegionInfo().getRegionName(), regionName) - || Bytes.equals(loc.getRegionInfo().getEncodedNameAsBytes(), regionName)) { + if (Bytes.equals(loc.getRegion().getRegionName(), regionName) + || Bytes.equals(loc.getRegion().getEncodedNameAsBytes(), regionName)) { return loc; } } @@ -331,7 +331,7 @@ public class RegionLocations { } public HRegionLocation getDefaultRegionLocation() { - return locations[HRegionInfo.DEFAULT_REPLICA_ID]; + return locations[RegionInfo.DEFAULT_REPLICA_ID]; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 51b89a9ec4..e268b2e88b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -23,8 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.ArrayList; @@ -43,24 +42,26 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; - import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Retry caller for batch. @@ -121,10 +122,10 @@ class AsyncBatchRpcRetryingCaller { private static final class ServerRequest { public final ConcurrentMap actionsByRegion = - new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); public void addAction(HRegionLocation loc, Action action) { - computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(), + computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), () -> new RegionRequest(loc)).actions.add(action); } } @@ -173,11 +174,10 @@ class AsyncBatchRpcRetryingCaller { Throwable error, ServerName serverName) { if (tries > startLogErrorsCnt) { String regions = - regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'") - .collect(Collectors.joining(",", "[", "]")); - LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName - + " failed, tries=" + tries, - error); + regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'") + .collect(Collectors.joining(",", "[", "]")); + LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName + + " failed, tries=" + tries, error); } } @@ -191,7 +191,7 @@ class AsyncBatchRpcRetryingCaller { errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>()); } errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), - getExtraContextForError(serverName))); + getExtraContextForError(serverName))); } private void addError(Iterable actions, Throwable error, ServerName serverName) { @@ -204,7 +204,7 @@ class AsyncBatchRpcRetryingCaller { return; } ThrowableWithExtraContext errorWithCtx = - new ThrowableWithExtraContext(error, currentTime, extras); + new ThrowableWithExtraContext(error, currentTime, extras); List errors = removeErrors(action); if (errors == null) { errors = Collections.singletonList(errorWithCtx); @@ -227,7 +227,7 @@ class AsyncBatchRpcRetryingCaller { return; } future.completeExceptionally(new RetriesExhaustedException(tries, - Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); + Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList()))); }); } @@ -242,9 +242,9 @@ class AsyncBatchRpcRetryingCaller { // multiRequestBuilder will be populated with region actions. // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the // action list. - RequestConverter.buildNoDataRegionActions(entry.getKey(), - entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder, - mutationBuilder, nonceGroup, rowMutationsIndexMap); + RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells, + multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, + rowMutationsIndexMap); } return multiRequestBuilder.build(); } @@ -254,15 +254,15 @@ class AsyncBatchRpcRetryingCaller { RegionResult regionResult, List failedActions, Throwable regionException) { Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException); if (result == null) { - LOG.error("Server " + serverName + " sent us neither result nor exception for row '" - + Bytes.toStringBinary(action.getAction().getRow()) + "' of " - + regionReq.loc.getRegionInfo().getRegionNameAsString()); + LOG.error("Server " + serverName + " sent us neither result nor exception for row '" + + Bytes.toStringBinary(action.getAction().getRow()) + "' of " + + regionReq.loc.getRegion().getRegionNameAsString()); addError(action, new RuntimeException("Invalid response"), serverName); failedActions.add(action); } else if (result instanceof Throwable) { Throwable error = translateException((Throwable) result); logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); + conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), getExtraContextForError(serverName)); @@ -281,20 +281,19 @@ class AsyncBatchRpcRetryingCaller { RegionResult regionResult = resp.getResults().get(rn); Throwable regionException = resp.getException(rn); if (regionResult != null) { - regionReq.actions.forEach( - action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions, - regionException)); + regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName, + regionResult, failedActions, regionException)); } else { Throwable error; if (regionException == null) { - LOG.error( - "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); + LOG + .error("Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn)); error = new RuntimeException("Invalid response"); } else { error = translateException(regionException); } logException(tries, () -> Stream.of(regionReq), error, serverName); - conn.getLocator().updateCachedLocation(regionReq.loc, error); + conn.getLocator().updateCachedLocationOnError(regionReq.loc, error); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failAll(regionReq.actions.stream(), tries, error, serverName); return; @@ -314,8 +313,7 @@ class AsyncBatchRpcRetryingCaller { remainingNs = remainingTimeNs(); if (remainingNs <= 0) { failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) - .flatMap(r -> r.actions.stream()), - tries); + .flatMap(r -> r.actions.stream()), tries); return; } } else { @@ -366,15 +364,15 @@ class AsyncBatchRpcRetryingCaller { ServerName serverName) { Throwable error = translateException(t); logException(tries, () -> actionsByRegion.values().stream(), error, serverName); - actionsByRegion - .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error)); + actionsByRegion.forEach( + (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error)); if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, serverName); return; } List copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); addError(copiedActions, error, serverName); tryResubmit(copiedActions.stream(), tries); } @@ -407,30 +405,30 @@ class AsyncBatchRpcRetryingCaller { } ConcurrentMap actionsByServer = new ConcurrentHashMap<>(); ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); - CompletableFuture.allOf(actions - .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), - RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { - if (error != null) { - error = translateException(error); - if (error instanceof DoNotRetryIOException) { - failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); - return; - } - addError(action, error, null); - locateFailed.add(action); - } else { - computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new) - .addAction(loc, action); + addListener(CompletableFuture.allOf(actions + .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), + RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { + if (error != null) { + error = translateException(error); + if (error instanceof DoNotRetryIOException) { + failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), ""); + return; } - })) - .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { - if (!actionsByServer.isEmpty()) { - send(actionsByServer, tries); - } - if (!locateFailed.isEmpty()) { - tryResubmit(locateFailed.stream(), tries); + addError(action, error, null); + locateFailed.add(action); + } else { + computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc, + action); } - }); + })) + .toArray(CompletableFuture[]::new)), (v, r) -> { + if (!actionsByServer.isEmpty()) { + send(actionsByServer, tries); + } + if (!locateFailed.isEmpty()) { + tryResubmit(locateFailed.stream(), tries); + } + }); } public List> call() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 915e9dd46c..fa051a5d41 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; @@ -94,6 +96,10 @@ class AsyncConnectionConfiguration { private final long writeBufferPeriodicFlushTimeoutNs; + // this is for supporting region replica get, if the primary does not finished within this + // timeout, we will send request to secondaries. + private final long primaryCallTimeoutNs; + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( @@ -124,6 +130,8 @@ class AsyncConnectionConfiguration { this.writeBufferPeriodicFlushTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); + this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( + conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); } long getMetaOperationTimeoutNs() { @@ -181,4 +189,8 @@ class AsyncConnectionConfiguration { long getWriteBufferPeriodicFlushTimeoutNs() { return writeBufferPeriodicFlushTimeoutNs; } + + long getPrimaryCallTimeoutNs() { + return primaryCallTimeoutNs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 078395ba06..361d5b2f92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -152,7 +152,6 @@ class AsyncConnectionImpl implements AsyncConnection { } // we will override this method for testing retry caller, so do not remove this method. - @VisibleForTesting AsyncRegionLocator getLocator() { return locator; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 06b5b57fc9..9fef15dbfc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.AsyncRegionLocator.*; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,43 +41,43 @@ class AsyncMetaRegionLocator { private final AsyncRegistry registry; - private final AtomicReference metaRegionLocation = new AtomicReference<>(); + private final AtomicReference metaRegionLocations = new AtomicReference<>(); - private final AtomicReference> metaRelocateFuture = - new AtomicReference<>(); + private final AtomicReference> metaRelocateFuture = + new AtomicReference<>(); AsyncMetaRegionLocator(AsyncRegistry registry) { this.registry = registry; } - CompletableFuture getRegionLocation(boolean reload) { + /** + * Get the region locations for meta region. If the location for the given replica is not + * available in the cached locations, then fetch from the HBase cluster. + *

+ * The replicaId parameter is important. If the region replication config for meta + * region is changed, then the cached region locations may not have the locations for new + * replicas. If we do not check the location for the given replica, we will always return the + * cached region locations and cause an infinite loop. + */ + CompletableFuture getRegionLocations(int replicaId, boolean reload) { for (;;) { if (!reload) { - HRegionLocation metaRegionLocation = this.metaRegionLocation.get(); - if (metaRegionLocation != null) { - return CompletableFuture.completedFuture(metaRegionLocation); + RegionLocations locs = this.metaRegionLocations.get(); + if (isGood(locs, replicaId)) { + return CompletableFuture.completedFuture(locs); } } - if (LOG.isTraceEnabled()) { - LOG.trace("Meta region location cache is null, try fetching from registry."); - } + LOG.trace("Meta region location cache is null, try fetching from registry."); if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Start fetching meta region location from registry."); - } - CompletableFuture future = metaRelocateFuture.get(); + LOG.debug("Start fetching meta region location from registry."); + CompletableFuture future = metaRelocateFuture.get(); registry.getMetaRegionLocation().whenComplete((locs, error) -> { if (error != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to fetch meta region location from registry", error); - } + LOG.debug("Failed to fetch meta region location from registry", error); metaRelocateFuture.getAndSet(null).completeExceptionally(error); return; } - HRegionLocation loc = locs.getDefaultRegionLocation(); - if (LOG.isDebugEnabled()) { - LOG.debug("The fetched meta region location is " + loc); - } + LOG.debug("The fetched meta region location is {}" + locs); // Here we update cache before reset future, so it is possible that someone can get a // stale value. Consider this: // 1. update cache @@ -82,12 +87,12 @@ class AsyncMetaRegionLocator { // cleared in step 2. // But we do not think it is a big deal as it rarely happens, and even if it happens, the // caller will retry again later, no correctness problems. - this.metaRegionLocation.set(loc); + this.metaRegionLocations.set(locs); metaRelocateFuture.set(null); - future.complete(loc); + future.complete(locs); }); } else { - CompletableFuture future = metaRelocateFuture.get(); + CompletableFuture future = metaRelocateFuture.get(); if (future != null) { return future; } @@ -95,30 +100,56 @@ class AsyncMetaRegionLocator { } } - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - AsyncRegionLocator.updateCachedLocation(loc, exception, l -> metaRegionLocation.get(), - newLoc -> { - for (;;) { - HRegionLocation oldLoc = metaRegionLocation.get(); - if (oldLoc != null && (oldLoc.getSeqNum() > newLoc.getSeqNum() || - oldLoc.getServerName().equals(newLoc.getServerName()))) { - return; - } - if (metaRegionLocation.compareAndSet(oldLoc, newLoc)) { - return; - } - } - }, l -> { - for (;;) { - HRegionLocation oldLoc = metaRegionLocation.get(); - if (!canUpdate(l, oldLoc) || metaRegionLocation.compareAndSet(oldLoc, null)) { - return; - } + private HRegionLocation getCacheLocation(HRegionLocation loc) { + RegionLocations locs = metaRegionLocations.get(); + return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; + } + + private void addLocationToCache(HRegionLocation loc) { + for (;;) { + int replicaId = loc.getRegion().getReplicaId(); + RegionLocations oldLocs = metaRegionLocations.get(); + if (oldLocs == null) { + RegionLocations newLocs = createRegionLocations(loc); + if (metaRegionLocations.compareAndSet(null, newLocs)) { + return; } - }); + } + HRegionLocation oldLoc = oldLocs.getRegionLocation(replicaId); + if (oldLoc != null && (oldLoc.getSeqNum() > loc.getSeqNum() || + oldLoc.getServerName().equals(loc.getServerName()))) { + return; + } + RegionLocations newLocs = replaceRegionLocation(oldLocs, loc); + if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) { + return; + } + } + } + + private void removeLocationFromCache(HRegionLocation loc) { + for (;;) { + RegionLocations oldLocs = metaRegionLocations.get(); + if (oldLocs == null) { + return; + } + HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); + if (!canUpdateOnError(loc, oldLoc)) { + return; + } + RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); + if (metaRegionLocations.compareAndSet(oldLocs, newLocs)) { + return; + } + } + } + + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { + AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation, + this::addLocationToCache, this::removeLocationFromCache); } void clearCache() { - metaRegionLocation.set(null); + metaRegionLocations.set(null); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 7e3d56c690..1fcfbb024d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.NINES; import static org.apache.hadoop.hbase.HConstants.ZEROES; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.mergeRegionLocations; +import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName; @@ -39,7 +44,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.commons.lang3.ObjectUtils; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -53,6 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Objects; /** * The asynchronous locator for regions other than meta. @@ -83,9 +91,9 @@ class AsyncNonMetaRegionLocator { private static final class LocateRequest { - public final byte[] row; + private final byte[] row; - public final RegionLocateType locateType; + private final RegionLocateType locateType; public LocateRequest(byte[] row, RegionLocateType locateType) { this.row = row; @@ -109,12 +117,12 @@ class AsyncNonMetaRegionLocator { private static final class TableCache { - public final ConcurrentNavigableMap cache = + private final ConcurrentNavigableMap cache = new ConcurrentSkipListMap<>(BYTES_COMPARATOR); - public final Set pendingRequests = new HashSet<>(); + private final Set pendingRequests = new HashSet<>(); - public final Map> allRequests = + private final Map> allRequests = new LinkedHashMap<>(); public boolean hasQuota(int max) { @@ -133,25 +141,29 @@ class AsyncNonMetaRegionLocator { return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); } - public void clearCompletedRequests(Optional location) { - for (Iterator>> iter = + public void clearCompletedRequests(Optional locations) { + for (Iterator>> iter = allRequests.entrySet().iterator(); iter.hasNext();) { - Map.Entry> entry = iter.next(); - if (tryComplete(entry.getKey(), entry.getValue(), location)) { + Map.Entry> entry = iter.next(); + if (tryComplete(entry.getKey(), entry.getValue(), locations)) { iter.remove(); } } } - private boolean tryComplete(LocateRequest req, CompletableFuture future, - Optional location) { + private boolean tryComplete(LocateRequest req, CompletableFuture future, + Optional locations) { if (future.isDone()) { return true; } - if (!location.isPresent()) { + if (!locations.isPresent()) { return false; } - HRegionLocation loc = location.get(); + RegionLocations locs = locations.get(); + HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations()); + // we should at least have one location available, otherwise the request should fail and + // should not arrive here + assert loc != null; boolean completed; if (req.locateType.equals(RegionLocateType.BEFORE)) { // for locating the row before current row, the common case is to find the previous region @@ -166,7 +178,7 @@ class AsyncNonMetaRegionLocator { completed = loc.getRegion().containsRow(req.row); } if (completed) { - future.complete(loc); + future.complete(locs); return true; } else { return false; @@ -186,59 +198,59 @@ class AsyncNonMetaRegionLocator { return computeIfAbsent(cache, tableName, TableCache::new); } - private void removeFromCache(HRegionLocation loc) { - TableCache tableCache = cache.get(loc.getRegion().getTable()); - if (tableCache == null) { - return; + private boolean isEqual(RegionLocations locs1, RegionLocations locs2) { + HRegionLocation[] locArr1 = locs1.getRegionLocations(); + HRegionLocation[] locArr2 = locs2.getRegionLocations(); + if (locArr1.length != locArr2.length) { + return false; } - tableCache.cache.computeIfPresent(loc.getRegion().getStartKey(), (k, oldLoc) -> { - if (oldLoc.getSeqNum() > loc.getSeqNum() || - !oldLoc.getServerName().equals(loc.getServerName())) { - return oldLoc; + for (int i = 0; i < locArr1.length; i++) { + // do not need to compare region info + HRegionLocation loc1 = locArr1[i]; + HRegionLocation loc2 = locArr2[i]; + if (loc1 == null) { + if (loc2 != null) { + return false; + } + } else { + if (loc2 == null) { + return false; + } + if (loc1.getSeqNum() != loc2.getSeqNum()) { + return false; + } + if (Objects.equal(loc1.getServerName(), loc2.getServerName())) { + return false; + } } - return null; - }); + } + return true; } // return whether we add this loc to cache - private boolean addToCache(TableCache tableCache, HRegionLocation loc) { - if (LOG.isTraceEnabled()) { - LOG.trace("Try adding " + loc + " to cache"); - } - byte[] startKey = loc.getRegion().getStartKey(); - HRegionLocation oldLoc = tableCache.cache.putIfAbsent(startKey, loc); - if (oldLoc == null) { - return true; - } - if (oldLoc.getSeqNum() > loc.getSeqNum() || - oldLoc.getServerName().equals(loc.getServerName())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldLoc + - " is newer than us or has the same server name"); - } - return false; - } - return loc == tableCache.cache.compute(startKey, (k, oldValue) -> { - if (oldValue == null || oldValue.getSeqNum() <= loc.getSeqNum()) { - return loc; + private boolean addToCache(TableCache tableCache, RegionLocations locs) { + LOG.trace("Try adding {} to cache", locs); + byte[] startKey = locs.getDefaultRegionLocation().getRegion().getStartKey(); + for (;;) { + RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs); + if (oldLocs == null) { + return true; } - if (LOG.isTraceEnabled()) { - LOG.trace("Will not add " + loc + " to cache because the old value " + oldValue + + RegionLocations mergedLocs = mergeRegionLocations(locs, oldLocs); + if (isEqual(mergedLocs, oldLocs)) { + // the merged one is the same with the old one, give up + LOG.trace("Will not add {} to cache because the old value {} " + " is newer than us or has the same server name." + - " Maybe it is updated before we replace it"); + " Maybe it is updated before we replace it", locs, oldLocs); + return false; } - return oldValue; - }); - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "Called by lambda expression") - private void addToCache(HRegionLocation loc) { - addToCache(getTableCache(loc.getRegion().getTable()), loc); - LOG.trace("Try adding {} to cache", loc); + if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) { + return true; + } + } } - private void complete(TableName tableName, LocateRequest req, HRegionLocation loc, + private void complete(TableName tableName, LocateRequest req, RegionLocations locs, Throwable error) { if (error != null) { LOG.warn("Failed to locate region in '" + tableName + "', row='" + @@ -246,8 +258,8 @@ class AsyncNonMetaRegionLocator { } Optional toSend = Optional.empty(); TableCache tableCache = getTableCache(tableName); - if (loc != null) { - if (!addToCache(tableCache, loc)) { + if (locs != null) { + if (!addToCache(tableCache, locs)) { // someone is ahead of us. synchronized (tableCache) { tableCache.pendingRequests.remove(req); @@ -269,7 +281,7 @@ class AsyncNonMetaRegionLocator { future.completeExceptionally(error); } } - tableCache.clearCompletedRequests(Optional.ofNullable(loc)); + tableCache.clearCompletedRequests(Optional.ofNullable(locs)); // Remove a complete locate request in a synchronized block, so the table cache must have // quota to send a candidate request. toSend = tableCache.getCandidate(); @@ -286,9 +298,11 @@ class AsyncNonMetaRegionLocator { Bytes.toStringBinary(req.row), req.locateType, locs); } + // the default region location should always be presented when fetching from meta, otherwise + // let's fail the request. if (locs == null || locs.getDefaultRegionLocation() == null) { complete(tableName, req, null, - new IOException(String.format("No location found for '%s', row='%s', locateType=%s", + new HBaseIOException(String.format("No location found for '%s', row='%s', locateType=%s", tableName, Bytes.toStringBinary(req.row), req.locateType))); return true; } @@ -296,58 +310,60 @@ class AsyncNonMetaRegionLocator { RegionInfo info = loc.getRegion(); if (info == null) { complete(tableName, req, null, - new IOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", + new HBaseIOException(String.format("HRegionInfo is null for '%s', row='%s', locateType=%s", tableName, Bytes.toStringBinary(req.row), req.locateType))); return true; } if (info.isSplitParent()) { return false; } - if (loc.getServerName() == null) { - complete(tableName, req, null, - new IOException( - String.format("No server address listed for region '%s', row='%s', locateType=%s", - info.getRegionNameAsString(), Bytes.toStringBinary(req.row), req.locateType))); - return true; - } - complete(tableName, req, loc, null); + complete(tableName, req, locs, null); return true; } - private HRegionLocation locateRowInCache(TableCache tableCache, TableName tableName, byte[] row) { - Map.Entry entry = tableCache.cache.floorEntry(row); + private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row, + int replicaId) { + Map.Entry entry = tableCache.cache.floorEntry(row); if (entry == null) { return null; } - HRegionLocation loc = entry.getValue(); + RegionLocations locs = entry.getValue(); + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + return null; + } byte[] endKey = loc.getRegion().getEndKey(); if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + - Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.CURRENT); + LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, + Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId); } - return loc; + return locs; } else { return null; } } - private HRegionLocation locateRowBeforeInCache(TableCache tableCache, TableName tableName, - byte[] row) { + private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName, + byte[] row, int replicaId) { boolean isEmptyStopRow = isEmptyStopRow(row); - Map.Entry entry = - isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); + Map.Entry entry = + isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row); if (entry == null) { return null; } - HRegionLocation loc = entry.getValue(); + RegionLocations locs = entry.getValue(); + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + return null; + } if (isEmptyStopRow(loc.getRegion().getEndKey()) || (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) { if (LOG.isTraceEnabled()) { - LOG.trace("Found " + loc + " in cache for '" + tableName + "', row='" + - Bytes.toStringBinary(row) + "', locateType=" + RegionLocateType.BEFORE); + LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName, + Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId); } - return loc; + return locs; } else { return null; } @@ -390,8 +406,8 @@ class AsyncNonMetaRegionLocator { if (tableNotFound) { complete(tableName, req, null, new TableNotFoundException(tableName)); } else if (!completeNormally) { - complete(tableName, req, null, new IOException( - "Unable to find region for " + Bytes.toStringBinary(req.row) + " in " + tableName)); + complete(tableName, req, null, new IOException("Unable to find region for '" + + Bytes.toStringBinary(req.row) + "' in " + tableName)); } } @@ -423,13 +439,12 @@ class AsyncNonMetaRegionLocator { continue; } RegionInfo info = loc.getRegion(); - if (info == null || info.isOffline() || info.isSplitParent() || - loc.getServerName() == null) { + if (info == null || info.isOffline() || info.isSplitParent()) { continue; } - if (addToCache(tableCache, loc)) { + if (addToCache(tableCache, locs)) { synchronized (tableCache) { - tableCache.clearCompletedRequests(Optional.of(loc)); + tableCache.clearCompletedRequests(Optional.of(locs)); } } } @@ -438,36 +453,36 @@ class AsyncNonMetaRegionLocator { }); } - private HRegionLocation locateInCache(TableCache tableCache, TableName tableName, byte[] row, - RegionLocateType locateType) { + private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row, + int replicaId, RegionLocateType locateType) { return locateType.equals(RegionLocateType.BEFORE) - ? locateRowBeforeInCache(tableCache, tableName, row) - : locateRowInCache(tableCache, tableName, row); + ? locateRowBeforeInCache(tableCache, tableName, row, replicaId) + : locateRowInCache(tableCache, tableName, row, replicaId); } // locateToPrevious is true means we will use the start key of a region to locate the region // placed before it. Used for reverse scan. See the comment of // AsyncRegionLocator.getPreviousRegionLocation. - private CompletableFuture getRegionLocationInternal(TableName tableName, - byte[] row, RegionLocateType locateType, boolean reload) { + private CompletableFuture getRegionLocationsInternal(TableName tableName, + byte[] row, int replicaId, RegionLocateType locateType, boolean reload) { // AFTER should be convert to CURRENT before calling this method assert !locateType.equals(RegionLocateType.AFTER); TableCache tableCache = getTableCache(tableName); if (!reload) { - HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); - if (loc != null) { - return CompletableFuture.completedFuture(loc); + RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); + if (isGood(locs, replicaId)) { + return CompletableFuture.completedFuture(locs); } } - CompletableFuture future; + CompletableFuture future; LocateRequest req; boolean sendRequest = false; synchronized (tableCache) { // check again if (!reload) { - HRegionLocation loc = locateInCache(tableCache, tableName, row, locateType); - if (loc != null) { - return CompletableFuture.completedFuture(loc); + RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType); + if (isGood(locs, replicaId)) { + return CompletableFuture.completedFuture(locs); } } req = new LocateRequest(row, locateType); @@ -487,28 +502,58 @@ class AsyncNonMetaRegionLocator { return future; } - CompletableFuture getRegionLocation(TableName tableName, byte[] row, - RegionLocateType locateType, boolean reload) { - if (locateType.equals(RegionLocateType.BEFORE)) { - return getRegionLocationInternal(tableName, row, locateType, reload); - } else { - // as we know the exact row after us, so we can just create the new row, and use the same - // algorithm to locate it. - if (locateType.equals(RegionLocateType.AFTER)) { - row = createClosestRowAfter(row); - } - return getRegionLocationInternal(tableName, row, RegionLocateType.CURRENT, reload); + CompletableFuture getRegionLocations(TableName tableName, byte[] row, + int replicaId, RegionLocateType locateType, boolean reload) { + // as we know the exact row after us, so we can just create the new row, and use the same + // algorithm to locate it. + if (locateType.equals(RegionLocateType.AFTER)) { + row = createClosestRowAfter(row); + locateType = RegionLocateType.CURRENT; } + return getRegionLocationsInternal(tableName, row, replicaId, locateType, reload); } - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - AsyncRegionLocator.updateCachedLocation(loc, exception, l -> { - TableCache tableCache = cache.get(l.getRegion().getTable()); - if (tableCache == null) { - return null; + private void removeLocationFromCache(HRegionLocation loc) { + TableCache tableCache = cache.get(loc.getRegion().getTable()); + if (tableCache == null) { + return; + } + byte[] startKey = loc.getRegion().getStartKey(); + for (;;) { + RegionLocations oldLocs = tableCache.cache.get(startKey); + HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId()); + if (!canUpdateOnError(loc, oldLoc)) { + return; } - return tableCache.cache.get(l.getRegion().getStartKey()); - }, this::addToCache, this::removeFromCache); + RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId()); + if (newLocs == null) { + if (tableCache.cache.remove(startKey, oldLocs)) { + return; + } + } else { + if (tableCache.cache.replace(startKey, oldLocs, newLocs)) { + return; + } + } + } + } + + private void addLocationToCache(HRegionLocation loc) { + addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc)); + } + + private HRegionLocation getCachedLocation(HRegionLocation loc) { + TableCache tableCache = cache.get(loc.getRegion().getTable()); + if (tableCache == null) { + return null; + } + RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey()); + return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null; + } + + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { + AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation, + this::addLocationToCache, this::removeLocationFromCache); } void clearCache(TableName tableName) { @@ -526,11 +571,11 @@ class AsyncNonMetaRegionLocator { // only used for testing whether we have cached the location for a region. @VisibleForTesting - HRegionLocation getRegionLocationInCache(TableName tableName, byte[] row) { + RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { TableCache tableCache = cache.get(tableName); if (tableCache == null) { return null; } - return locateRowInCache(tableCache, tableName, row); + return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 56228abe16..d624974ecc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -18,26 +18,24 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; -import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; -import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; -import org.apache.hbase.thirdparty.io.netty.util.Timeout; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; - +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.util.Bytes; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timeout; /** * The asynchronous region locator. @@ -59,8 +57,8 @@ class AsyncRegionLocator { this.retryTimer = retryTimer; } - private CompletableFuture withTimeout(CompletableFuture future, - long timeoutNs, Supplier timeoutMsg) { + private CompletableFuture withTimeout(CompletableFuture future, long timeoutNs, + Supplier timeoutMsg) { if (future.isDone() || timeoutNs <= 0) { return future; } @@ -78,74 +76,75 @@ class AsyncRegionLocator { }); } - CompletableFuture getRegionLocation(TableName tableName, byte[] row, + private boolean isMeta(TableName tableName) { + return TableName.isMetaTableName(tableName); + } + + CompletableFuture getRegionLocations(TableName tableName, byte[] row, RegionLocateType type, boolean reload, long timeoutNs) { + CompletableFuture future = isMeta(tableName) + ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, + RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); + return withTimeout(future, timeoutNs, + () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + + "ms) waiting for region locations for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "'"); + } + + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { // meta region can not be split right now so we always call the same method. // Change it later if the meta table can have more than one regions. - CompletableFuture future = - tableName.equals(META_TABLE_NAME) ? metaRegionLocator.getRegionLocation(reload) - : nonMetaRegionLocator.getRegionLocation(tableName, row, type, reload); + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture locsFuture = + isMeta(tableName) ? metaRegionLocator.getRegionLocations(replicaId, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); + addListener(locsFuture, (locs, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + if (loc == null) { + future + .completeExceptionally(new RegionException("No location for " + tableName + ", row='" + + Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId)); + } else if (loc.getServerName() == null) { + future.completeExceptionally(new HBaseIOException("No server address listed for region '" + + loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row) + + "', locateType=" + type + ", replicaId=" + replicaId)); + } else { + future.complete(loc); + } + }); return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + - "ms) waiting for region location for " + tableName + ", row='" + - Bytes.toStringBinary(row) + "'"); + "ms) waiting for region location for " + tableName + ", row='" + Bytes.toStringBinary(row) + + "', replicaId=" + replicaId); } CompletableFuture getRegionLocation(TableName tableName, byte[] row, - RegionLocateType type, long timeoutNs) { - return getRegionLocation(tableName, row, type, false, timeoutNs); + int replicaId, RegionLocateType type, long timeoutNs) { + return getRegionLocation(tableName, row, replicaId, type, false, timeoutNs); } - static boolean canUpdate(HRegionLocation loc, HRegionLocation oldLoc) { - // Do not need to update if no such location, or the location is newer, or the location is not - // same with us - return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() && - oldLoc.getServerName().equals(loc.getServerName()); + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + RegionLocateType type, boolean reload, long timeoutNs) { + return getRegionLocation(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload, + timeoutNs); } - static void updateCachedLocation(HRegionLocation loc, Throwable exception, - Function cachedLocationSupplier, - Consumer addToCache, Consumer removeFromCache) { - HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); - if (LOG.isDebugEnabled()) { - LOG.debug("Try updating " + loc + ", the old value is " + oldLoc, exception); - } - if (!canUpdate(loc, oldLoc)) { - return; - } - Throwable cause = findException(exception); - if (LOG.isDebugEnabled()) { - LOG.debug("The actual exception when updating " + loc, cause); - } - if (cause == null || !isMetaClearingException(cause)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Will not update " + loc + " because the exception is null or not the one we care about"); - } - return; - } - if (cause instanceof RegionMovedException) { - RegionMovedException rme = (RegionMovedException) cause; - HRegionLocation newLoc = - new HRegionLocation(loc.getRegionInfo(), rme.getServerName(), rme.getLocationSeqNum()); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Try updating " + loc + " with the new location " + newLoc + " constructed by " + rme); - } - addToCache.accept(newLoc); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Try removing " + loc + " from cache"); - } - removeFromCache.accept(loc); - } + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + RegionLocateType type, long timeoutNs) { + return getRegionLocation(tableName, row, type, false, timeoutNs); } - void updateCachedLocation(HRegionLocation loc, Throwable exception) { + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { if (loc.getRegion().isMetaRegion()) { - metaRegionLocator.updateCachedLocation(loc, exception); + metaRegionLocator.updateCachedLocationOnError(loc, exception); } else { - nonMetaRegionLocator.updateCachedLocation(loc, exception); + nonMetaRegionLocator.updateCachedLocationOnError(loc, exception); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java new file mode 100644 index 0000000000..5c9c09154e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findException; +import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException; + +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for asynchronous region locator. + */ +@InterfaceAudience.Private +final class AsyncRegionLocatorHelper { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncRegionLocatorHelper.class); + + private AsyncRegionLocatorHelper() { + } + + static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) { + // Do not need to update if no such location, or the location is newer, or the location is not + // the same with us + return oldLoc != null && oldLoc.getSeqNum() <= loc.getSeqNum() && + oldLoc.getServerName().equals(loc.getServerName()); + } + + static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception, + Function cachedLocationSupplier, + Consumer addToCache, Consumer removeFromCache) { + HRegionLocation oldLoc = cachedLocationSupplier.apply(loc); + LOG.debug("Try updating {} , the old value is {}", loc, oldLoc, exception); + if (!canUpdateOnError(loc, oldLoc)) { + return; + } + Throwable cause = findException(exception); + LOG.debug("The actual exception when updating {}", loc, cause); + if (cause == null || !isMetaClearingException(cause)) { + LOG.debug("Will not update {} because the exception is null or not the one we care about", + loc); + return; + } + if (cause instanceof RegionMovedException) { + RegionMovedException rme = (RegionMovedException) cause; + HRegionLocation newLoc = + new HRegionLocation(loc.getRegion(), rme.getServerName(), rme.getLocationSeqNum()); + LOG.debug("Try updating {} with the new location {} constructed by {}", loc, newLoc, rme); + addToCache.accept(newLoc); + } else { + LOG.debug("Try removing {} from cache", loc); + removeFromCache.accept(loc); + } + } + + static RegionLocations createRegionLocations(HRegionLocation loc) { + int replicaId = loc.getRegion().getReplicaId(); + HRegionLocation[] locs = new HRegionLocation[replicaId + 1]; + locs[replicaId] = loc; + return new RegionLocations(locs); + } + + /** + * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and replace the + * location for the given {@code replicaId} with the given {@code loc}. + *

+ * All the {@link RegionLocations} in async locator related class are immutable because we want to + * access them concurrently, so here we need to create a new one, instead of calling + * {@link RegionLocations#updateLocation(HRegionLocation, boolean, boolean)}. + */ + static RegionLocations replaceRegionLocation(RegionLocations oldLocs, HRegionLocation loc) { + int replicaId = loc.getRegion().getReplicaId(); + HRegionLocation[] locs = oldLocs.getRegionLocations(); + locs = Arrays.copyOf(locs, Math.max(replicaId + 1, locs.length)); + locs[replicaId] = loc; + return new RegionLocations(locs); + } + + /** + * Create a new {@link RegionLocations} based on the given {@code oldLocs}, and remove the + * location for the given {@code replicaId}. + *

+ * All the {@link RegionLocations} in async locator related class are immutable because we want to + * access them concurrently, so here we need to create a new one, instead of calling + * {@link RegionLocations#remove(int)}. + */ + static RegionLocations removeRegionLocation(RegionLocations oldLocs, int replicaId) { + HRegionLocation[] locs = oldLocs.getRegionLocations(); + if (locs.length < replicaId + 1) { + // Here we do not modify the oldLocs so it is safe to return it. + return oldLocs; + } + locs = Arrays.copyOf(locs, locs.length); + locs[replicaId] = null; + if (ObjectUtils.firstNonNull(locs) != null) { + return new RegionLocations(locs); + } else { + // if all the locations are null, just return null + return null; + } + } + + /** + * Create a new {@link RegionLocations} which is the merging result for the given two + * {@link RegionLocations}. + *

+ * All the {@link RegionLocations} in async locator related class are immutable because we want to + * access them concurrently, so here we need to create a new one, instead of calling + * {@link RegionLocations#mergeLocations(RegionLocations)} directly. + */ + static RegionLocations mergeRegionLocations(RegionLocations newLocs, RegionLocations oldLocs) { + RegionLocations locs = new RegionLocations(newLocs.getRegionLocations()); + locs.mergeLocations(oldLocs); + return locs; + } + + static boolean isGood(RegionLocations locs, int replicaId) { + if (locs == null) { + return false; + } + HRegionLocation loc = locs.getRegionLocation(replicaId); + return loc != null && loc.getServerName() != null; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index d30012f4bc..e03049a52b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -88,15 +88,15 @@ public abstract class AsyncRpcRetryingCaller { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); } - protected long remainingTimeNs() { + protected final long remainingTimeNs() { return operationTimeoutNs - (System.nanoTime() - startNs); } - protected void completeExceptionally() { + protected final void completeExceptionally() { future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions)); } - protected void resetCallTimeout() { + protected final void resetCallTimeout() { long callTimeoutNs; if (operationTimeoutNs > 0) { callTimeoutNs = remainingTimeNs(); @@ -111,8 +111,15 @@ public abstract class AsyncRpcRetryingCaller { resetController(controller, callTimeoutNs); } - protected void onError(Throwable error, Supplier errMsg, + protected final void onError(Throwable error, Supplier errMsg, Consumer updateCachedLocation) { + if (future.isDone()) { + // Give up if the future is already done, this is possible if user has already canceled the + // future. And for timeline consistent read, we will also cancel some requests if we have + // already get one of the responses. + LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled()); + return; + } error = translateException(error); if (error instanceof DoNotRetryIOException) { future.completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index f80b4e5c27..a660e7457e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -17,22 +17,22 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; @@ -75,6 +75,8 @@ class AsyncRpcRetryingCallerFactory { private RegionLocateType locateType = RegionLocateType.CURRENT; + private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -121,11 +123,17 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SingleRequestCallerBuilder replicaId(int replicaId) { + this.replicaId = replicaId; + return this; + } + public AsyncSingleRequestRpcRetryingCaller build() { + checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, - checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), - checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), - pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId, + checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), + pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -241,11 +249,11 @@ class AsyncRpcRetryingCallerFactory { public AsyncScanSingleRegionRpcRetryingCaller build() { checkArgument(scannerId != null, "invalid scannerId %d", scannerId); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, - checkNotNull(scan, "scan is null"), scanMetrics, scannerId, - checkNotNull(resultCache, "resultCache is null"), - checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, - pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(scan, "scan is null"), scanMetrics, scannerId, + checkNotNull(resultCache, "resultCache is null"), + checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), + checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, + pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -311,7 +319,7 @@ class AsyncRpcRetryingCallerFactory { public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, - maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } public List> call() { @@ -363,8 +371,8 @@ class AsyncRpcRetryingCallerFactory { public AsyncMasterRequestRpcRetryingCaller build() { return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, - checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** @@ -390,7 +398,8 @@ class AsyncRpcRetryingCallerFactory { private ServerName serverName; - public AdminRequestCallerBuilder action(AsyncAdminRequestRetryingCaller.Callable callable) { + public AdminRequestCallerBuilder action( + AsyncAdminRequestRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -420,15 +429,15 @@ class AsyncRpcRetryingCallerFactory { return this; } - public AdminRequestCallerBuilder serverName(ServerName serverName){ + public AdminRequestCallerBuilder serverName(ServerName serverName) { this.serverName = serverName; return this; } public AsyncAdminRequestRetryingCaller build() { return new AsyncAdminRequestRetryingCaller(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, - "serverName is null"), checkNotNull(callable, "action is null")); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } public CompletableFuture call() { @@ -436,7 +445,7 @@ class AsyncRpcRetryingCallerFactory { } } - public AdminRequestCallerBuilder adminRequest(){ + public AdminRequestCallerBuilder adminRequest() { return new AdminRequestCallerBuilder<>(); } @@ -488,8 +497,8 @@ class AsyncRpcRetryingCallerFactory { public AsyncServerRequestRpcRetryingCaller build() { return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, - "serverName is null"), checkNotNull(callable, "action is null")); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } public CompletableFuture call() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 56c82fb4de..1a52e5cc6a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -17,17 +17,19 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; - import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; /** * Retry caller for a single request, such as get, put, delete, etc. @@ -45,18 +47,21 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { private final byte[] row; + private final int replicaId; + private final RegionLocateType locateType; private final Callable callable; public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, byte[] row, RegionLocateType locateType, Callable callable, - long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt) { + TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, + Callable callable, long pauseNs, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + startLogErrorsCnt); this.tableName = tableName; this.row = row; + this.replicaId = replicaId; this.locateType = locateType; this.callable = callable; } @@ -67,23 +72,22 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { stub = conn.getRegionServerStub(loc.getServerName()); } catch (IOException e) { onError(e, - () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) - + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed", - err -> conn.getLocator().updateCachedLocation(loc, err)); + () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + + "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); return; } resetCallTimeout(); - callable.call(controller, loc, stub).whenComplete( - (result, error) -> { - if (error != null) { - onError(error, - () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " - + loc.getRegion().getEncodedName() + " of " + tableName + " failed", - err -> conn.getLocator().updateCachedLocation(loc, err)); - return; - } - future.complete(result); - }); + callable.call(controller, loc, stub).whenComplete((result, error) -> { + if (error != null) { + onError(error, + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); + return; + } + future.complete(result); + }); } @Override @@ -98,18 +102,17 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { } else { locateTimeoutNs = -1L; } - conn.getLocator() - .getRegionLocation(tableName, row, locateType, locateTimeoutNs) - .whenComplete( - (loc, error) -> { - if (error != null) { - onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName - + " failed", err -> { - }); - return; - } - call(loc); - }); + addListener( + conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs), + (loc, error) -> { + if (error != null) { + onError(error, + () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> { + }); + return; + } + call(loc); + }); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java index dbfcef51a0..3bda38e926 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocator.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import java.util.concurrent.CompletableFuture; - import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -55,5 +54,30 @@ public interface AsyncTableRegionLocator { * @param row Row to find. * @param reload true to reload information or false to use cached information */ - CompletableFuture getRegionLocation(byte[] row, boolean reload); + default CompletableFuture getRegionLocation(byte[] row, boolean reload) { + return getRegionLocation(row, RegionReplicaUtil.DEFAULT_REPLICA_ID, reload); + } + + /** + * Finds the region with the given replicaId on which the given row is being served. + *

+ * Returns the location of the region with the given replicaId to which the row + * belongs. + * @param row Row to find. + * @param replicaId the replica id of the region + */ + default CompletableFuture getRegionLocation(byte[] row, int replicaId) { + return getRegionLocation(row, replicaId, false); + } + + /** + * Finds the region with the given replicaId on which the given row is being served. + *

+ * Returns the location of the region with the given replicaId to which the row + * belongs. + * @param row Row to find. + * @param replicaId the replica id of the region + * @param reload true to reload information or false to use cached information + */ + CompletableFuture getRegionLocation(byte[] row, int replicaId, boolean reload); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java index 7d199df319..465a411f08 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableRegionLocatorImpl.java @@ -44,7 +44,9 @@ class AsyncTableRegionLocatorImpl implements AsyncTableRegionLocator { } @Override - public CompletableFuture getRegionLocation(byte[] row, boolean reload) { - return locator.getRegionLocation(tableName, row, RegionLocateType.CURRENT, reload, -1L); + public CompletableFuture getRegionLocation(byte[] row, int replicaId, + boolean reload) { + return locator.getRegionLocation(tableName, row, replicaId, RegionLocateType.CURRENT, reload, + -1L); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index d99600426d..55c62e70c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -38,6 +38,9 @@ public class ConnectionConfiguration { public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760; + public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND = + "hbase.client.primaryCallTimeout.get"; + public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; @@ -86,7 +89,7 @@ public class ConnectionConfiguration { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.primaryCallTimeoutMicroSecond = - conf.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10ms + conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT); this.replicaCallTimeoutMicroSecondScan = conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index d705d7c4f3..28db7e86e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType /** * The implementation of RawAsyncTable. - *

+ *

* The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will * be finished inside the rpc framework thread, which means that the callbacks registered to the * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use @@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType @InterfaceAudience.Private class RawAsyncTableImpl implements AsyncTable { + private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); + private final AsyncConnectionImpl conn; private final TableName tableName; @@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable { private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { return conn.callerFactory. single().table(tableName).row(row) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt); + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); } private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { return newCaller(row.getRow(), rpcTimeoutNs); } + private CompletableFuture get(Get get, int replicaId, long timeoutNs) { + return this. newCaller(get, timeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl + . call(controller, loc, stub, get, + RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), + (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) + .replicaId(replicaId).call(); + } + + // Connect the two futures, if the src future is done, then mark the dst future as done. And if + // the dst future is done, then cancel the src future. This is used for timeline consistent read. + private void connect(CompletableFuture srcFuture, CompletableFuture dstFuture) { + addListener(srcFuture, (r, e) -> { + if (e != null) { + dstFuture.completeExceptionally(e); + } else { + dstFuture.complete(r); + } + }); + // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. + // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst + // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in + // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the + // tie. + addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); + } + + private void timelineConsistentGet(Get get, RegionLocations locs, + CompletableFuture future) { + if (future.isDone()) { + // do not send requests to secondary replicas if the future is done, i.e, the primary request + // has already been finished. + return; + } + for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { + CompletableFuture secondaryFuture = get(get, replicaId, readRpcTimeoutNs); + connect(secondaryFuture, future); + } + } + @Override public CompletableFuture get(Get get) { - return this. newCaller(get, readRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl - . call(controller, loc, stub, get, - RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), - (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) - .call(); + CompletableFuture primaryFuture = + get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); + if (get.getConsistency() == Consistency.STRONG) { + return primaryFuture; + } + // Timeline consistent read, where we will send requests to other region replicas + CompletableFuture future = new CompletableFuture<>(); + connect(primaryFuture, future); + long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); + long startNs = System.nanoTime(); + addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(), + RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> { + if (error != null) { + LOG.warn( + "Failed to locate all the replicas for table={}, row='{}'," + + " give up timeline consistent read", + tableName, Bytes.toStringBinary(get.getRow()), error); + return; + } + if (locs.size() <= 1) { + LOG.warn( + "There are no secondary replicas for region {}," + " give up timeline consistent read", + locs.getDefaultRegionLocation().getRegion()); + return; + } + long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); + if (delayNs <= 0) { + timelineConsistentGet(get, locs, future); + } else { + AsyncConnectionImpl.RETRY_TIMER.newTimeout( + timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS); + } + }); + return future; } @Override public CompletableFuture put(Put put) { return this. newCaller(put, writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, - put, RequestConverter::buildMutateRequest)) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, + put, RequestConverter::buildMutateRequest)) + .call(); } @Override public CompletableFuture delete(Delete delete) { return this. newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, - stub, delete, RequestConverter::buildMutateRequest)) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, + stub, delete, RequestConverter::buildMutateRequest)) + .call(); } @Override public CompletableFuture append(Append append) { checkHasFamilies(append); return this. newCaller(append, rpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, - append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) - .call(); + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, + append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .call(); } @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); return this. newCaller(increment, rpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(controller, loc, - stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) - .call(); + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, + stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .call(); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture thenPut(Put put) { preCheck(); return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, - loc, stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), - (c, r) -> r.getProcessed())) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, + stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), + (c, r) -> r.getProcessed())) + .call(); } @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), - (c, r) -> r.getProcessed())) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, + loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), + (c, r) -> r.getProcessed())) + .call(); } @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); return RawAsyncTableImpl.this. newCaller(mutation, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, - stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), - resp -> resp.getExists())) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, + stub, mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), + resp -> resp.getExists())) + .call(); } } @@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable { if (ex != null) { future.completeExceptionally(ex instanceof IOException ? ex : new IOException( - "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); + "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); } else { future.complete(respConverter - .apply((Result) multiResp.getResults().get(regionName).result.get(0))); + .apply((Result) multiResp.getResults().get(regionName).result.get(0))); } } catch (IOException e) { future.completeExceptionally(e); @@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - }, resp -> null)).call(); + }, resp -> null)) + .call(); } private Scan setDefaultScanConfig(Scan scan) { @@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable { public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs, - maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); + maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public ResultScanner getScanner(Scan scan) { return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), - resultSize2CacheSize( - scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); } @Override @@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable { private List> voidMutate(List actions) { return this. batch(actions, writeRpcTimeoutNs).stream() - .map(f -> f. thenApply(r -> null)).collect(toList()); + .map(f -> f. thenApply(r -> null)).collect(toList()); } private List> batch(List actions, long rpcTimeoutNs) { return conn.callerFactory.batch().table(tableName).actions(actions) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); } @Override @@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable { private CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, - region, row, rpcTimeoutNs, operationTimeoutNs); + region, row, rpcTimeoutNs, operationTimeoutNs); S stub = stubMaker.apply(channel); CompletableFuture future = new CompletableFuture<>(); ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); @@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable { } private void onLocateComplete(Function stubMaker, - ServiceCaller callable, CoprocessorCallback callback, - List locs, byte[] endKey, boolean endKeyInclusive, - AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, - Throwable error) { + ServiceCaller callable, CoprocessorCallback callback, List locs, + byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, + AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { if (error != null) { callback.onError(error); return; @@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable { if (locateFinished(region, endKey, endKeyInclusive)) { locateFinished.set(true); } else { - conn.getLocator() - .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, - operationTimeoutNs) - .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, - endKeyInclusive, locateFinished, unfinishedRequest, l, e)); + addListener( + conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, + operationTimeoutNs), + (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, + locateFinished, unfinishedRequest, l, e)); } coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> { if (e != null) { @@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable { @Override public void execute() { - conn.getLocator().getRegionLocation(tableName, startKey, - startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs) - .whenComplete( - (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), - endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + addListener(conn.getLocator().getRegionLocation(tableName, startKey, + startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), + (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, + endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java new file mode 100644 index 0000000000..067e66b5ec --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for processing futures. + */ +@InterfaceAudience.Private +public final class FutureUtils { + + private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); + + private FutureUtils() { + } + + /** + * This is method is used when you just want to add a listener to the given future. We will call + * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the + * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may + * suppress exceptions thrown from the code that completes the future, and this method will catch + * all the exception thrown from the {@code action} to catch possible code bugs. + *

+ * And the error phone check will always report FutureReturnValueIgnored because every method in + * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always + * have one future that has not been checked. So we introduce this method and add a suppress + * warnings annotation here. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public static void addListener(CompletableFuture future, + BiConsumer action) { + future.whenComplete((resp, error) -> { + try { + action.accept(resp, error); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing CompletableFuture", t); + } + }); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java new file mode 100644 index 0000000000..c14f69f5eb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.util.Bytes; + +final class RegionReplicaTestHelper { + + private RegionReplicaTestHelper() { + } + + // waits for all replicas to have region location + static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry, + int regionReplication) throws IOException { + TestZKAsyncRegistry.TEST_UTIL.waitFor( + TestZKAsyncRegistry.TEST_UTIL.getConfiguration() + .getLong("hbase.client.sync.wait.timeout.msec", 60000), + 200, true, new ExplainingPredicate() { + @Override + public String explainFailure() throws IOException { + return "Not all meta replicas get assigned"; + } + + @Override + public boolean evaluate() throws IOException { + try { + RegionLocations locs = registry.getMetaRegionLocation().get(); + if (locs.size() < regionReplication) { + return false; + } + for (int i = 0; i < regionReplication; i++) { + if (locs.getRegionLocation(i) == null) { + return false; + } + } + return true; + } catch (Exception e) { + TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e); + return false; + } + } + }); + } + + static Optional getRSCarryingReplica(HBaseTestingUtility util, TableName tableName, + int replicaId) { + return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .filter(rs -> rs.getRegions(tableName).stream() + .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId)) + .findAny().map(rs -> rs.getServerName()); + } + + /** + * Return the new location. + */ + static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc) + throws Exception { + ServerName serverName = currentLoc.getServerName(); + RegionInfo regionInfo = currentLoc.getRegion(); + TableName tableName = regionInfo.getTable(); + int replicaId = regionInfo.getReplicaId(); + ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() + .get(); + util.getAdmin().move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(newServerName.getServerName())); + util.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + Optional newServerName = getRSCarryingReplica(util, tableName, replicaId); + return newServerName.isPresent() && !newServerName.get().equals(serverName); + } + + @Override + public String explainFailure() throws Exception { + return regionInfo.getRegionNameAsString() + " is still on " + serverName; + } + }); + return newServerName; + } + + interface Locator { + RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) + throws Exception; + + void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception; + } + + static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator) + throws Exception { + RegionLocations locs = + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false); + assertEquals(3, locs.size()); + for (int i = 0; i < 3; i++) { + HRegionLocation loc = locs.getRegionLocation(i); + assertNotNull(loc); + ServerName serverName = getRSCarryingReplica(util, tableName, i).get(); + assertEquals(serverName, loc.getServerName()); + } + ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation()); + // The cached location should not be changed + assertEquals(locs.getDefaultRegionLocation().getServerName(), + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false) + .getDefaultRegionLocation().getServerName()); + // should get the new location when reload = true + assertEquals(newServerName, + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true) + .getDefaultRegionLocation().getServerName()); + // the cached location should be replaced + assertEquals(newServerName, + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false) + .getDefaultRegionLocation().getServerName()); + + ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1)); + ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2)); + + // The cached location should not be change + assertEquals(locs.getRegionLocation(1).getServerName(), + locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName()); + // clear the cached location for replica 1 + locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException()); + // the cached location for replica 2 should not be changed + assertEquals(locs.getRegionLocation(2).getServerName(), + locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName()); + // should get the new location as we have cleared the old location + assertEquals(newServerName1, + locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName()); + // as we will get the new location for replica 2 at once, we should also get the new location + // for replica 2 + assertEquals(newServerName2, + locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 7c08d6db73..df1fe089ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -17,20 +17,19 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator; -import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class); + HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator { @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); + TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.startMiniCluster(3); - TEST_UTIL.waitUntilAllSystemRegionsAssigned(); - TEST_UTIL.getAdmin().setBalancerRunning(false, true); REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3); + TEST_UTIL.getAdmin().balancerSwitch(false, true); LOCATOR = new AsyncMetaRegionLocator(REGISTRY); } @@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator { TEST_UTIL.shutdownMiniCluster(); } - private Optional getRSCarryingMeta() { - return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer()) - .filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny() - .map(rs -> rs.getServerName()); - } - @Test - public void testReload() throws Exception { - ServerName serverName = getRSCarryingMeta().get(); - assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName()); - - ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) - .findAny().get(); - TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), - Bytes.toBytes(newServerName.getServerName())); - TEST_UTIL.waitFor(30000, new ExplainingPredicate() { + public void test() throws Exception { + testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() { @Override - public boolean evaluate() throws Exception { - Optional newServerName = getRSCarryingMeta(); - return newServerName.isPresent() && !newServerName.get().equals(serverName); + public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) + throws Exception { + LOCATOR.updateCachedLocationOnError(loc, error); } @Override - public String explainFailure() throws Exception { - return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " + - serverName; + public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) + throws Exception { + return LOCATOR.getRegionLocations(replicaId, reload).get(); } }); - // The cached location will not change - assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName()); - // should get the new location when reload = true - assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName()); - // the cached location should be replaced - assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index 38dc78d54d..eeaf99f9ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class); + HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.getAdmin().balancerSwitch(false, true); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()); + registry.getClusterId().get(), User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.waitTableAvailable(TABLE_NAME); } + private CompletableFuture getDefaultRegionLocation(TableName tableName, + byte[] row, RegionLocateType locateType, boolean reload) { + return LOCATOR + .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload) + .thenApply(RegionLocations::getDefaultRegionLocation); + } + @Test public void testNoTable() throws InterruptedException { for (RegionLocateType locateType : RegionLocateType.values()) { try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.getAdmin().disableTable(TABLE_NAME); for (RegionLocateType locateType : RegionLocateType.values()) { try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator { ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)]; ThreadLocalRandom.current().nextBytes(randKey); for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get()); } } @@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator { private ServerName[] getLocations(byte[][] startKeys) { ServerName[] serverNames = new ServerName[startKeys.length]; TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .forEach(rs -> { - rs.getRegions(TABLE_NAME).forEach(r -> { - serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(), - Bytes::compareTo)] = rs.getServerName(); - }); + .forEach(rs -> { + rs.getRegions(TABLE_NAME).forEach(r -> { + serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(), + Bytes::compareTo)] = rs.getServerName(); }); + }); return serverNames; } @@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator { IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], - serverNames[i], LOCATOR - .getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get()); + serverNames[i], + getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false) + .get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], serverNames[i], - LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get()); + getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator { n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { try { assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i], - LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false) - .get()); + getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator { public void testRegionMove() throws IOException, InterruptedException, ExecutionException { createSingleRegionTable(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); - HRegionLocation loc = LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get(); + HRegionLocation loc = + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get(); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc); ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) - .findAny().get(); + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() + .get(); TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()), Bytes.toBytes(newServerName.getServerName())); while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName() - .equals(newServerName)) { + .equals(newServerName)) { Thread.sleep(100); } // Should be same as it is in cache - assertSame(loc, LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); - LOCATOR.updateCachedLocation(loc, null); + assertSame(loc, + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); + LOCATOR.updateCachedLocationOnError(loc, null); // null error will not trigger a cache cleanup - assertSame(loc, LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); - LOCATOR.updateCachedLocation(loc, new NotServingRegionException()); - assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); + assertSame(loc, + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); + LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); } // usually locate after will return the same result, so we add a test to make it return different @@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey }); TEST_UTIL.waitTableAvailable(TABLE_NAME); HRegionLocation currentLoc = - LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get(); + getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get(); ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc); HRegionLocation afterLoc = - LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get(); + getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get(); ServerName afterServerName = - TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .filter(rs -> rs.getRegions(TABLE_NAME).stream() - .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey()))) - .findAny().get().getServerName(); + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .filter(rs -> rs.getRegions(TABLE_NAME).stream() + .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey()))) + .findAny().get().getServerName(); assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc); assertSame(afterLoc, - LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get()); + getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get()); } // For HBASE-17402 @@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator { ServerName[] serverNames = getLocations(startKeys); for (int i = 0; i < 100; i++) { LOCATOR.clearCache(TABLE_NAME); - List> futures = IntStream.range(0, 1000) - .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s)) - .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) + List> futures = + IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s)) + .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) .collect(toList()); for (int j = 0; j < 1000; j++) { int index = Math.min(8, j / 111); @@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator { ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) - .findAny().get(); + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() + .get(); Admin admin = TEST_UTIL.getAdmin(); RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get(); admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName())); @@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator { // The cached location will not change for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } // should get the new location when reload = true assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get()); // the cached location should be replaced for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } } @@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator { public void testLocateBeforeLastRegion() throws IOException, InterruptedException, ExecutionException { createMultiRegionTable(); - LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join(); + getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join(); HRegionLocation loc = - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get(); + getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get(); // should locate to the last region assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW); } + + @Test + public void testRegionReplicas() throws Exception { + TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + testLocator(TEST_UTIL, TABLE_NAME, new Locator() { + + @Override + public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) + throws Exception { + LOCATOR.updateCachedLocationOnError(loc, error); + } + + @Override + public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) + throws Exception { + return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId, + RegionLocateType.CURRENT, reload).get(); + } + }); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java index c6624e7fcd..8cdb4a9363 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class); + HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { TEST_UTIL.getAdmin().balancerSwitch(false, true); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()); + registry.getClusterId().get(), User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) - .toArray(byte[][]::new); + .toArray(byte[][]::new); TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); TEST_UTIL.waitTableAvailable(TABLE_NAME); } @@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { TEST_UTIL.shutdownMiniCluster(); } - private void assertLocs(List> futures) + private void assertLocs(List> futures) throws InterruptedException, ExecutionException { assertEquals(256, futures.size()); for (int i = 0; i < futures.size(); i++) { - HRegionLocation loc = futures.get(i).get(); + HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation(); if (i == 0) { assertTrue(isEmptyStartRow(loc.getRegion().getStartKey())); } else { @@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { @Test public void test() throws InterruptedException, ExecutionException { - List> futures = - IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) - .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) - .collect(toList()); + List> futures = + IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) + .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID, + RegionLocateType.CURRENT, false)) + .collect(toList()); assertLocs(futures); assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(), MAX_CONCURRENCY.get() <= MAX_ALLOWED); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index a6c2efb8af..7d8956b542 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class); + HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { TEST_UTIL.waitTableAvailable(TABLE_NAME); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()); + registry.getClusterId().get(), User.getCurrent()); } @AfterClass @@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller { int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName()); TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes( TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); - AsyncTable table = CONN.getTableBuilder(TABLE_NAME) - .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build(); + AsyncTable table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS) + .setMaxRetries(30).build(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); // move back @@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller { public void testMaxRetries() throws IOException, InterruptedException { try { CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) - .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS) - .action((controller, loc, stub) -> failedFuture()).call().get(); + .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS) + .action((controller, loc, stub) -> failedFuture()).call().get(); fail(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); @@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller { long startNs = System.nanoTime(); try { CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS) - .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE) - .action((controller, loc, stub) -> failedFuture()).call().get(); + .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE) + .action((controller, loc, stub) -> failedFuture()).call().get(); fail(); } catch (ExecutionException e) { e.printStackTrace(); @@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller { AtomicInteger count = new AtomicInteger(0); HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); AsyncRegionLocator mockedLocator = - new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) { - @Override - CompletableFuture getRegionLocation(TableName tableName, byte[] row, - RegionLocateType locateType, long timeoutNs) { - if (tableName.equals(TABLE_NAME)) { - CompletableFuture future = new CompletableFuture<>(); - if (count.getAndIncrement() == 0) { - errorTriggered.set(true); - future.completeExceptionally(new RuntimeException("Inject error!")); - } else { - future.complete(loc); - } - return future; + new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) { + @Override + CompletableFuture getRegionLocation(TableName tableName, byte[] row, + int replicaId, RegionLocateType locateType, long timeoutNs) { + if (tableName.equals(TABLE_NAME)) { + CompletableFuture future = new CompletableFuture<>(); + if (count.getAndIncrement() == 0) { + errorTriggered.set(true); + future.completeExceptionally(new RuntimeException("Inject error!")); } else { - return super.getRegionLocation(tableName, row, locateType, timeoutNs); + future.complete(loc); } + return future; + } else { + return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs); } + } - @Override - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - } - }; + @Override + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { + } + }; try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(), - CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) { + CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) { @Override AsyncRegionLocator getLocator() { @@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } }) { AsyncTable table = mockedConn.getTableBuilder(TABLE_NAME) - .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build(); + .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(errorTriggered.get()); errorTriggered.set(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java index 13d800095b..6c6bb98d84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java @@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch { @Test public void test() throws InterruptedException, ExecutionException { - assertNotNull(LOCATOR - .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get()); + assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"), + RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get()); // we finish the request before we adding the remaining results to cache so sleep a bit here Thread.sleep(1000); // confirm that the locations of all the regions have been cached. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java new file mode 100644 index 0000000000..0445a0e884 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableRegionReplicasGet { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public Supplier> getTable; + + private static AsyncTable getRawTable() { + return ASYNC_CONN.getTable(TABLE_NAME); + } + + private static AsyncTable getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableRegionReplicasGet::getRawTable }, + new Supplier[] { TestAsyncTableRegionReplicasGet::getTable }); + } + + private static volatile boolean FAIL_PRIMARY_GET = false; + + private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0); + + private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0); + + public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + RegionInfo region = c.getEnvironment().getRegionInfo(); + if (!region.getTable().equals(TABLE_NAME)) { + return; + } + if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { + SECONDARY_GET_COUNT.incrementAndGet(); + } else { + PRIMARY_GET_COUNT.incrementAndGet(); + if (FAIL_PRIMARY_GET) { + throw new IOException("Inject error"); + } + } + } + } + + private static boolean allReplicasHaveRow() throws IOException { + for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { + if (region.get(new Get(ROW), false).isEmpty()) { + return false; + } + } + } + return true; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // 10 mins + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + TimeUnit.MINUTES.toMillis(10)); + // 1 second + TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND, + TimeUnit.SECONDS.toMicros(1)); + // set a small pause so we will retry very quickly + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); + // infinite retry + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3) + .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + // this is the fastest way to let all replicas have the row + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + TEST_UTIL.getAdmin().enableTable(TABLE_NAME); + TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testNoReplicaRead() throws Exception { + FAIL_PRIMARY_GET = false; + SECONDARY_GET_COUNT.set(0); + AsyncTable table = getTable.get(); + Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); + for (int i = 0; i < 1000; i++) { + assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); + } + // the primary region is fine and the primary timeout is 1 second which is long enough, so we + // should not send any requests to secondary replicas even if the consistency is timeline. + Thread.sleep(5000); + assertEquals(0, SECONDARY_GET_COUNT.get()); + } + + @Test + public void testReplicaRead() throws Exception { + // fail the primary get request + FAIL_PRIMARY_GET = true; + Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); + // make sure that we could still get the value from secondary replicas + AsyncTable table = getTable.get(); + assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); + // make sure that the primary request has been canceled + Thread.sleep(5000); + int count = PRIMARY_GET_COUNT.get(); + Thread.sleep(10000); + assertEquals(count, PRIMARY_GET_COUNT.get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java index db7546fc54..46890d0963 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame; import java.io.IOException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; @@ -52,43 +50,13 @@ public class TestZKAsyncRegistry { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKAsyncRegistry.class); + HBaseClassTestRule.forClass(TestZKAsyncRegistry.class); - private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static ZKAsyncRegistry REGISTRY; - // waits for all replicas to have region location - static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException { - TEST_UTIL.waitFor( - TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true, - new ExplainingPredicate() { - @Override - public String explainFailure() throws IOException { - return TEST_UTIL.explainTableAvailability(tbl); - } - - @Override - public boolean evaluate() throws IOException { - AtomicBoolean ready = new AtomicBoolean(true); - try { - RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); - assertEquals(3, locs.getRegionLocations().length); - IntStream.range(0, 3).forEach(i -> { - HRegionLocation loc = locs.getRegionLocation(i); - if (loc == null) { - ready.set(false); - } - }); - } catch (Exception e) { - ready.set(false); - } - return ready.get(); - } - }); - } - @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); @@ -107,14 +75,14 @@ public class TestZKAsyncRegistry { LOG.info("STARTED TEST"); String clusterId = REGISTRY.getClusterId().get(); String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId(); - assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, - expectedClusterId, clusterId); + assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, + clusterId); assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(), REGISTRY.getCurrentNrHRS().get().intValue()); assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), REGISTRY.getMasterAddress().get()); assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue()); - waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME); + RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3); RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); assertEquals(3, locs.getRegionLocations().length); IntStream.range(0, 3).forEach(i -> { -- 2.17.1