From 39488365d3002a2eb655febcfedffde367121b23 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 11 Dec 2018 14:46:16 +0800 Subject: [PATCH] HBASE-21538 Rewrite RegionReplicaFlushHandler to use AsyncClusterConnection --- .../hbase/client/AsyncClusterConnection.java | 8 + .../hbase/client/AsyncConnectionImpl.java | 7 + .../hbase/client/RawAsyncHBaseAdmin.java | 155 +++--------------- .../hbase/regionserver/HRegionServer.java | 3 +- .../handler/RegionReplicaFlushHandler.java | 66 ++++---- 5 files changed, 72 insertions(+), 167 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 1327fd779b..f48cbc3222 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.client; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; + /** * The asynchronous connection for internal usage. */ @@ -41,4 +44,9 @@ public interface AsyncClusterConnection extends AsyncConnection { * Get the rpc client we used to communicate with other servers. */ RpcClient getRpcClient(); + + /** + * Flush a region and get the response. + */ + CompletableFuture flush(byte[] regionName); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 5625ee8769..92d223053a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -54,6 +54,7 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; @@ -335,4 +336,10 @@ class AsyncConnectionImpl implements AsyncClusterConnection { public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { return new AsyncRegionServerAdmin(serverName, this); } + + @Override + public CompletableFuture flush(byte[] regionName) { + RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); + return admin.flushRegionInternal(regionName); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 4f73909c31..e3db6ab2cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -125,128 +125,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfi import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.*; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; @@ -820,7 +699,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture flushRegion(byte[] regionName) { - CompletableFuture future = new CompletableFuture<>(); + return flushRegionInternal(regionName).thenAccept(r -> { + }); + } + + /** + * This method is for internal use only, where we need the response of the flush. + *

+ * As it exposes the protobuf message, please do NOT try to expose it as a public + * API. + */ + CompletableFuture flushRegionInternal(byte[] regionName) { + CompletableFuture future = new CompletableFuture<>(); getRegionLocation(regionName).whenComplete( (location, err) -> { if (err != null) { @@ -845,15 +735,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture flush(final ServerName serverName, final RegionInfo regionInfo) { - return this. newAdminCaller() - .serverName(serverName) - .action( - (controller, stub) -> this. adminCall( - controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), - resp -> null)) - .call(); + private CompletableFuture flush(ServerName serverName, RegionInfo regionInfo) { + return this. newAdminCaller().serverName(serverName) + .action((controller, stub) -> this + . adminCall(controller, stub, + RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName()), + (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp)) + .call(); } @Override @@ -866,7 +754,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } List> compactFutures = new ArrayList<>(); if (hRegionInfos != null) { - hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); + hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region).thenAccept(r -> { + }))); } CompletableFuture .allOf(compactFutures.toArray(new CompletableFuture[compactFutures.size()])) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0d0a34e338..92aef51427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2377,8 +2377,7 @@ public class HRegionServer extends HasThread implements // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler if (this.executorService != null) { - this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, - rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); + this.executorService.submit(new RegionReplicaFlushHandler(this, region)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java index b917379930..a1d4cf2d69 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -20,26 +20,25 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.io.InterruptedIOException; - +import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.FlushRegionCallable; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; /** * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in @@ -56,20 +55,13 @@ public class RegionReplicaFlushHandler extends EventHandler { private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class); - private final ClusterConnection connection; - private final RpcRetryingCallerFactory rpcRetryingCallerFactory; - private final RpcControllerFactory rpcControllerFactory; - private final int operationTimeout; + private final AsyncClusterConnection connection; + private final HRegion region; - public RegionReplicaFlushHandler(Server server, ClusterConnection connection, - RpcRetryingCallerFactory rpcRetryingCallerFactory, RpcControllerFactory rpcControllerFactory, - int operationTimeout, HRegion region) { + public RegionReplicaFlushHandler(Server server, HRegion region) { super(server, EventType.RS_REGION_REPLICA_FLUSH); - this.connection = connection; - this.rpcRetryingCallerFactory = rpcRetryingCallerFactory; - this.rpcControllerFactory = rpcControllerFactory; - this.operationTimeout = operationTimeout; + this.connection = server.getAsyncClusterConnection(); this.region = region; } @@ -103,7 +95,7 @@ public class RegionReplicaFlushHandler extends EventHandler { return numRetries; } - void triggerFlushInPrimaryRegion(final HRegion region) throws IOException, RuntimeException { + void triggerFlushInPrimaryRegion(final HRegion region) throws IOException { long pause = connection.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); @@ -115,24 +107,34 @@ public class RegionReplicaFlushHandler extends EventHandler { .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region " + region.getRegionInfo().getEncodedName() + " to trigger a flush"); } + boolean reload = false; while (!region.isClosing() && !region.isClosed() && !server.isAborted() && !server.isStopped()) { - FlushRegionCallable flushCallable = new FlushRegionCallable( - connection, rpcControllerFactory, - RegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()), true); // TODO: flushRegion() is a blocking call waiting for the flush to complete. Ideally we // do not have to wait for the whole flush here, just initiate it. FlushRegionResponse response = null; try { - response = rpcRetryingCallerFactory.newCaller() - .callWithRetries(flushCallable, this.operationTimeout); - } catch (IOException ex) { - if (ex instanceof TableNotFoundException - || connection.isTableDisabled(region.getRegionInfo().getTable())) { - return; + response = connection.flush(region.getRegionInfo().getRegionName()).get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + try { + if (cause instanceof TableNotFoundException || + connection.getAdmin().isTableDisabled(region.getRegionInfo().getTable()).get()) { + return; + } + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + continue; + } catch (ExecutionException e1) { + Throwables.propagateIfPossible(e1.getCause(), IOException.class); + throw new IOException(e1.getCause()); } - throw ex; + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new IOException(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; } if (response.getFlushed()) { -- 2.17.1