From af6c023879d61dc5296aae93d46444c6e3346e28 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 12 Dec 2018 09:33:33 +0800 Subject: [PATCH] HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection --- .../hbase/client/AsyncClusterConnection.java | 8 ++ .../hbase/client/AsyncConnectionImpl.java | 8 ++ .../hbase/client/RawAsyncHBaseAdmin.java | 70 +++++----- .../hbase/regionserver/HRegionServer.java | 3 +- .../handler/RegionReplicaFlushHandler.java | 125 +++++++++++------- 5 files changed, 132 insertions(+), 82 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 1327fd779b..f1f64ca164 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.client; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; + /** * The asynchronous connection for internal usage. */ @@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection { * Get the rpc client we used to communicate with other servers. */ RpcClient getRpcClient(); + + /** + * Flush a region and get the response. + */ + CompletableFuture flush(byte[] regionName, boolean writeFlushWALMarker); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 5625ee8769..3e9d32eacc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -54,6 +54,7 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; @@ -335,4 +336,11 @@ class AsyncConnectionImpl implements AsyncClusterConnection { public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { return new AsyncRegionServerAdmin(serverName, this); } + + @Override + public CompletableFuture flush(byte[] regionName, + boolean writeFlushWALMarker) { + RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); + return admin.flushRegionInternal(regionName, writeFlushWALMarker); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 4f73909c31..c52362d2f0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -820,40 +820,49 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture flushRegion(byte[] regionName) { - CompletableFuture future = new CompletableFuture<>(); - getRegionLocation(regionName).whenComplete( - (location, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - ServerName serverName = location.getServerName(); - if (serverName == null) { - future.completeExceptionally(new NoServerForRegionException(Bytes - .toStringBinary(regionName))); - return; + return flushRegionInternal(regionName, false).thenAccept(r -> { + }); + } + + /** + * This method is for internal use only, where we need the response of the flush. + *

+ * As it exposes the protobuf message, please do NOT try to expose it as a public + * API. + */ + CompletableFuture flushRegionInternal(byte[] regionName, + boolean writeFlushWALMarker) { + CompletableFuture future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete((location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future + .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + flush(serverName, location.getRegion(), writeFlushWALMarker).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); } - flush(serverName, location.getRegion()) - .whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); }); + }); return future; } - private CompletableFuture flush(final ServerName serverName, final RegionInfo regionInfo) { - return this. newAdminCaller() - .serverName(serverName) - .action( - (controller, stub) -> this. adminCall( - controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), - resp -> null)) - .call(); + private CompletableFuture flush(ServerName serverName, RegionInfo regionInfo, + boolean writeFlushWALMarker) { + return this. newAdminCaller().serverName(serverName) + .action((controller, stub) -> this + . adminCall(controller, stub, + RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker), + (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) + .call(); } @Override @@ -866,7 +875,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } List> compactFutures = new ArrayList<>(); if (hRegionInfos != null) { - hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); + hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> { + }))); } CompletableFuture .allOf(compactFutures.toArray(new CompletableFuture[compactFutures.size()])) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0d0a34e338..92aef51427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2377,8 +2377,7 @@ public class HRegionServer extends HasThread implements // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler if (this.executorService != null) { - this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, - rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + this.executorService.submit(new RegionReplicaFlushHandler(this, region)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java index b917379930..7baf124441 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -20,26 +20,25 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.io.InterruptedIOException; - +import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.FlushRegionCallable; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; /** * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in @@ -56,20 +55,13 @@ public class RegionReplicaFlushHandler extends EventHandler { private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); - private final ClusterConnection connection; - private final RpcRetryingCallerFactory rpcRetryingCallerFactory; - private final RpcControllerFactory rpcControllerFactory; - private final int operationTimeout; + private final AsyncClusterConnection connection; + private final HRegion region; - public RegionReplicaFlushHandler(Server server, ClusterConnection connection, - RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory, - int operationTimeout, HRegion region) { + public RegionReplicaFlushHandler(Server server, HRegion region) { super(server, EventType.RS_REGION_REPLICA_FLUSH); - this.connection = connection; - this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; - this.operationTimeout = operationTimeout; + this.connection = server.getAsyncClusterConnection(); this.region = region; } @@ -103,7 +95,7 @@ public class RegionReplicaFlushHandler extends EventHandler { return numRetries; } - void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { + void triggerFlushInPrimaryRegion(final HRegion region) throws IOException { long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); @@ -117,44 +109,69 @@ public class RegionReplicaFlushHandler extends EventHandler { } while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { - FlushRegionCallable flushCallable = new FlushRegionCallable( - connection, rpcControllerFactory, - RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); - // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we // do not have to wait for the whole flush here, just initiate it. - FlushRegionResponse response = null; + FlushRegionResponse response; try { - response = rpcRetryingCallerFactory.newCaller() - .callWithRetries(flushCallable, this.operationTimeout); - } catch (IOException ex) { - if (ex instanceof TableNotFoundException - || connection.isTableDisabled(region.getRegionInfo().getTable())) { - return; + response = connection.flush(ServerRegionReplicaUtil + .getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionName(), true).get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + try { + if (cause instanceof TableNotFoundException || + connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()).get()) { + return; + } + } catch (InterruptedException e1) { + throw new InterruptedIOException(e1.getMessage()); + } catch (ExecutionException e1) { + Throwables.propagateIfPossible(e1.getCause(), IOException.class); + throw new IOException(e1.getCause()); } - throw ex; + if (!counter.shouldRetry()) { + Throwables.propagateIfPossible(cause, IOException.class); + throw new IOException(cause); + } + // The reason that why we need to retry here is that, the retry for asynchronous admin + // request is much simpler than the normal operation, if we failed to locate the region once + // then we will throw the exception out and will not try to relocate again. So here we need + // to add some retries by ourselves to prevent shutting down the region server too + // frequent... + LOG.debug("Failed to trigger a flush of primary region replica {} of region {}, retry={}", + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString(), + region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes(), cause); + try { + counter.sleepUntilNextRetry(); + } catch (InterruptedException e1) { + throw new InterruptedIOException(e1.getMessage()); + } + continue; + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); } if (response.getFlushed()) { // then we have to wait for seeing the flush entry. All reads will be rejected until we see // a complete flush cycle or replay a region open event if (LOG.isDebugEnabled()) { - LOG.debug("Successfully triggered a flush of primary region replica " - + ServerRegionReplicaUtil - .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() - + " of region " + region.getRegionInfo().getEncodedName() - + " Now waiting and blocking reads until observing a full flush cycle"); + LOG.debug("Successfully triggered a flush of primary region replica " + + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString() + + " of region " + region.getRegionInfo().getRegionNameAsString() + + " Now waiting and blocking reads until observing a full flush cycle"); } break; } else { if (response.hasWroteFlushWalMarker()) { - if(response.getWroteFlushWalMarker()) { + if (response.getWroteFlushWalMarker()) { if (LOG.isDebugEnabled()) { - LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " - + "region replica " + ServerRegionReplicaUtil - .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() - + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and " - + "blocking reads until observing a flush marker"); + LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary " + + "region replica " + + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString() + + " of region " + region.getRegionInfo().getRegionNameAsString() + + " Now waiting and " + "blocking reads until observing a flush marker"); } break; } else { @@ -162,15 +179,23 @@ public class RegionReplicaFlushHandler extends EventHandler { // closing or already flushing. Retry flush again after some sleep. if (!counter.shouldRetry()) { throw new IOException("Cannot cause primary to flush or drop a wal marker after " + - "retries. Failing opening of this region replica " - + region.getRegionInfo().getEncodedName()); + counter.getAttemptTimes() + " retries. Failing opening of this region replica " + + region.getRegionInfo().getRegionNameAsString()); + } else { + LOG.warn( + "Cannot cause primary replica {} to flush or drop a wal marker " + + "for region replica {}, retry={}", + ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + .getRegionNameAsString(), + region.getRegionInfo().getRegionNameAsString(), counter.getAttemptTimes()); } } } else { // nothing to do. Are we dealing with an old server? - LOG.warn("Was not able to trigger a flush from primary region due to old server version? " - + "Continuing to open the secondary region replica: " - + region.getRegionInfo().getEncodedName()); + LOG.warn( + "Was not able to trigger a flush from primary region due to old server version? " + + "Continuing to open the secondary region replica: " + + region.getRegionInfo().getRegionNameAsString()); region.setReadsEnabled(true); break; } -- 2.17.1