diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java index b610d84ae4..086a826741 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; */ @InterfaceAudience.Private public final class VersionInfoUtil { + private static final ThreadLocal NonCallVersion = new ThreadLocal<>(); private VersionInfoUtil() { /* UTIL CLASS ONLY */ @@ -67,11 +69,30 @@ public final class VersionInfoUtil { return false; } + /** We intend to use the local version for service call shortcut(s), so we use an interface + * compatible with a typical service call, with 2 args, return type, and an exception type. + */ + public interface ServiceCallFunction { + R apply(T1 t1, T2 t2) throws E; + } + + public static R callWithVersion( + ServiceCallFunction f, T1 t1, T2 t2) throws E { + // Note: just as RpcServer.CurCall, this will only apply on the current thread. + NonCallVersion.set(ProtobufUtil.getVersionInfo()); + try { + return f.apply(t1, t2); + } finally { + NonCallVersion.remove(); + } + } + /** * @return the versionInfo extracted from the current RpcCallContext */ public static HBaseProtos.VersionInfo getCurrentClientVersionInfo() { - return RpcServer.getCurrentCall().map(RpcCallContext::getClientVersionInfo).orElse(null); + return RpcServer.getCurrentCall().map( + RpcCallContext::getClientVersionInfo).orElse(NonCallVersion.get()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java index f771210b87..231e751782 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -72,6 +72,8 @@ public abstract class ProcedurePrepareLatch { } private static boolean hasProcedureSupport(int major, int minor) { + // Note: this won't work if the shortcut similar to the one in HRegionServer is used + // without the corresponding version handling. return VersionInfoUtil.currentClientHasMinimumVersion(major, minor); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 34a6c13924..a7ed3ba0bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -118,6 +119,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; @@ -213,6 +215,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; @@ -2578,7 +2581,7 @@ public class HRegionServer extends HasThread implements // If we are on the active master, use the shortcut if (this instanceof HMaster && sn.equals(getServerName())) { - intRssStub = ((HMaster)this).getMasterRpcServices(); + intRssStub = new MasterRpcServicesVersionWrapper(((HMaster)this).getMasterRpcServices()); intLockStub = ((HMaster)this).getMasterRpcServices(); break; } @@ -3854,4 +3857,80 @@ public class HRegionServer extends HasThread implements System.exit(1); } } + + @FunctionalInterface + private interface ServiceCallFunction + extends VersionInfoUtil.ServiceCallFunction { + } + + /** + * A wrapper class for masterrpcservices shortcut that ensures a client version is available + * to the callee without a current RPC call. + */ + private static class MasterRpcServicesVersionWrapper + implements RegionServerStatusService.BlockingInterface { + private final MasterRpcServices masterRpcServices; + private final ServiceCallFunction< + RegionServerStartupRequest, RegionServerStartupResponse> startupCall; + private final ServiceCallFunction< + RegionServerReportRequest, RegionServerReportResponse> reportCall; + + + public MasterRpcServicesVersionWrapper(MasterRpcServices masterRpcServices) { + this.masterRpcServices = masterRpcServices; + this.startupCall = (c, req) -> masterRpcServices.regionServerStartup(c, req); + this.reportCall = (c, req) -> masterRpcServices.regionServerReport(c, req); + } + + @Override + public RegionServerStartupResponse regionServerStartup( + RpcController controller, RegionServerStartupRequest request) throws ServiceException { + return VersionInfoUtil.callWithVersion(startupCall, controller, request); + } + + @Override + public RegionServerStatusProtos.RegionServerReportResponse regionServerReport( + RpcController controller, RegionServerReportRequest request) throws ServiceException { + return VersionInfoUtil.callWithVersion(reportCall, controller, request); + } + + @Override + public RegionServerStatusProtos.ReportRSFatalErrorResponse reportRSFatalError( + RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException { + return masterRpcServices.reportRSFatalError(controller, request); + } + + @Override + public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId( + RpcController controller, GetLastFlushedSequenceIdRequest request) + throws ServiceException { + return masterRpcServices.getLastFlushedSequenceId(controller, request); + } + + @Override + public ReportRegionStateTransitionResponse reportRegionStateTransition( + RpcController controller, ReportRegionStateTransitionRequest request) + throws ServiceException { + return masterRpcServices.reportRegionStateTransition(controller, request); + } + + @Override + public RegionServerStatusProtos.RegionSpaceUseReportResponse reportRegionSpaceUse( + RpcController controller, RegionSpaceUseReportRequest request) throws ServiceException { + return masterRpcServices.reportRegionSpaceUse(controller, request); + } + + @Override + public RegionServerStatusProtos.ReportProcedureDoneResponse reportProcedureDone( + RpcController controller, ReportProcedureDoneRequest request) throws ServiceException { + return masterRpcServices.reportProcedureDone(controller, request); + } + + @Override + public RegionServerStatusProtos.FileArchiveNotificationResponse reportFileArchival( + RpcController controller, RegionServerStatusProtos.FileArchiveNotificationRequest request) + throws ServiceException { + return masterRpcServices.reportFileArchival(controller, request); + } + } }