diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java index 01ba3bd..29eb58c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -106,12 +106,17 @@ public abstract class AbstractClientRequestInterceptor try { // Do not create a proxy user if user name matches the user name on // current UGI - if (userName.equalsIgnoreCase( - UserGroupInformation.getCurrentUser().getUserName())) { - user = UserGroupInformation.getCurrentUser(); - } else { + if (UserGroupInformation.isSecurityEnabled()) { user = UserGroupInformation.createProxyUser(userName, - UserGroupInformation.getCurrentUser()); + UserGroupInformation.getLoginUser()); + } else { + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } } } catch (IOException e) { String message = "Error while creating Router ClientRM Service for user:"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index d2adfdc..a0c7623 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -619,4 +619,12 @@ public abstract class ApplicationSubmissionContext { @Unstable public abstract void setApplicationSchedulingPropertiesMap( Map schedulingEnvMap); + + @Public + @Unstable + public abstract String getHomeCluster(); + + @Public + @Unstable + public abstract void setHomeCluster(String homeClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index b30224e..b207ddf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -739,4 +739,23 @@ extends ApplicationSubmissionContext { this.schedulingProperties.clear(); this.schedulingProperties.putAll(schedulingPropertyMap); } + + @Override + public String getHomeCluster() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasHomeCluster()) { + return null; + } + return p.getHomeCluster(); + } + + @Override + public void setHomeCluster(String homeCluster) { + maybeInitBuilder(); + if (homeCluster == null) { + builder.clearHomeCluster(); + return; + } + builder.setHomeCluster(homeCluster); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/DelegationTokenFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/DelegationTokenFetcher.java new file mode 100644 index 0000000..15101ff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/DelegationTokenFetcher.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.yarn.server.router.security; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +import java.io.IOException; + +public abstract class DelegationTokenFetcher { + + protected RouterDelegationTokenSecretManager secretManager; + + public abstract void start() throws Exception; + + public DelegationTokenFetcher(RouterDelegationTokenSecretManager secretManager) { + this.secretManager = secretManager; + } + + protected void updateToken(RMDelegationTokenIdentifier identifier, long renewDate) throws IOException { + secretManager.addPersistedDelegationToken(identifier, renewDate); + } + + protected void removeToken(Token token, String user) throws IOException { + secretManager.cancelToken(token, user); + } + + protected void updateMasterKey(DelegationKey key) throws IOException { + secretManager.addKey(key); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ExtendDelegationKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ExtendDelegationKey.java new file mode 100644 index 0000000..3a114c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ExtendDelegationKey.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.yarn.server.router.security; + +import org.apache.hadoop.security.token.delegation.DelegationKey; + +public class ExtendDelegationKey extends DelegationKey { + + private String cluster; + + public ExtendDelegationKey(String subcluster, DelegationKey key) { + super(key.getKeyId(), key.getExpiryDate(), key.getEncodedKey()); + this.cluster = subcluster; + } + + @Override + public int getKeyId() { + return enhance(); + } + + + public int enhance() { + final int p = 16777619; + int hash = (int) 2166136261L; + for (byte b : cluster.getBytes()) + hash = (hash ^ b) * p; + hash += hash << 13; + hash ^= hash >> 7; + hash += hash << 3; + hash ^= hash >> 17; + hash += hash << 5; + return hash + super.getKeyId(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ExtendRMDelegationTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ExtendRMDelegationTokenIdentifier.java new file mode 100644 index 0000000..befde13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ExtendRMDelegationTokenIdentifier.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.yarn.server.router.security; + +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +public class ExtendRMDelegationTokenIdentifier extends RMDelegationTokenIdentifier { + + private String cluster; + + public ExtendRMDelegationTokenIdentifier(String cluster, RMDelegationTokenIdentifier rmDTIdentifier) { + super(rmDTIdentifier.getOwner(), rmDTIdentifier.getRenewer(), rmDTIdentifier.getRealUser()); + setSequenceNumber(rmDTIdentifier.getSequenceNumber()); + setMasterKeyId(rmDTIdentifier.getMasterKeyId()); + setIssueDate(rmDTIdentifier.getIssueDate()); + setMaxDate(rmDTIdentifier.getMaxDate()); + this.cluster = cluster; + } + + public int getExtendMasterKeyId() { + return enhance(); + } + + public int enhance() { + final int p = 16777619; + int hash = (int) 2166136261L; + for (byte b : cluster.getBytes()) + hash = (hash ^ b) * p; + hash += hash << 13; + hash ^= hash >> 7; + hash += hash << 3; + hash ^= hash >> 17; + hash += hash << 5; + return hash + super.getMasterKeyId(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 07eaf97..716ebb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -19,14 +19,12 @@ package org.apache.hadoop.yarn.server.router.clientrm; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; +import java.util.*; +import java.util.concurrent.*; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -94,8 +92,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsReque import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; @@ -175,6 +175,18 @@ public class FederationClientInterceptor + "is correct"); } + private boolean isAllowedDelegationTokenOp() throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS, + UserGroupInformation.AuthenticationMethod.KERBEROS_SSL, + UserGroupInformation.AuthenticationMethod.CERTIFICATE) + .contains(UserGroupInformation.getCurrentUser() + .getRealAuthenticationMethod()); + } else { + return true; + } + } + @VisibleForTesting protected ApplicationClientProtocol getClientRMProxyForSubCluster( SubClusterId subClusterId) throws YarnException { @@ -353,73 +365,46 @@ public class FederationClientInterceptor ApplicationId applicationId = request.getApplicationSubmissionContext().getApplicationId(); - List blacklist = new ArrayList(); - - for (int i = 0; i < numSubmitRetries; ++i) { - - SubClusterId subClusterId = policyFacade.getHomeSubcluster( - request.getApplicationSubmissionContext(), blacklist); - LOG.info("submitApplication appId" + applicationId + " try #" + i - + " on SubCluster " + subClusterId); + String homeCluster = request.getApplicationSubmissionContext().getHomeCluster(); + if (homeCluster != null) { + Map subClustersActive = + federationFacade.getSubClusters(true); + if (subClustersActive == null || subClustersActive.size() < 1) { + RouterServerUtil.logAndThrowException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); + } + SubClusterId homeClusterId = SubClusterId.newInstance(homeCluster); + if (!subClustersActive.containsKey(homeClusterId)) + throw new YarnException("The HomeCluster " + homeCluster + + " does not exist in active SubClusters."); ApplicationHomeSubCluster appHomeSubCluster = - ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - - if (i == 0) { - try { - // persist the mapping of applicationId and the subClusterId which has - // been selected as its home - subClusterId = - federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); - } catch (YarnException e) { - routerMetrics.incrAppsFailedSubmitted(); - String message = "Unable to insert the ApplicationId " + applicationId - + " into the FederationStateStore"; - RouterServerUtil.logAndThrowException(message, e); - } - } else { - try { - // update the mapping of applicationId and the home subClusterId to - // the new subClusterId we have selected - federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); - } catch (YarnException e) { - String message = "Unable to update the ApplicationId " + applicationId - + " into the FederationStateStore"; - SubClusterId subClusterIdInStateStore = - federationFacade.getApplicationHomeSubCluster(applicationId); - if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application " + applicationId - + " already submitted on SubCluster " + subClusterId); - } else { - routerMetrics.incrAppsFailedSubmitted(); - RouterServerUtil.logAndThrowException(message, e); - } - } - } + ApplicationHomeSubCluster.newInstance(applicationId, homeClusterId); + homeClusterId = federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); - ApplicationClientProtocol clientRMProxy = - getClientRMProxyForSubCluster(subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(homeClusterId); SubmitApplicationResponse response = null; try { response = clientRMProxy.submitApplication(request); } catch (Exception e) { LOG.warn("Unable to submit the application " + applicationId - + "to SubCluster " + subClusterId.getId(), e); + + "to SubCluster " + homeClusterId.getId(), e); } - if (response != null) { LOG.info("Application " - + request.getApplicationSubmissionContext().getApplicationName() - + " with appId " + applicationId + " submitted on " + subClusterId); + + request.getApplicationSubmissionContext().getApplicationName() + + " with appId " + applicationId + " submitted on " + homeClusterId); long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); return response; - } else { - // Empty response from the ResourceManager. - // Blacklist this subcluster for this request. - blacklist.add(subClusterId); } + } else { + String errMsg = "Application " + + request.getApplicationSubmissionContext().getApplicationName() + + " does not specify home cluster"; + LOG.error(errMsg); + throw new YarnException(errMsg); } routerMetrics.incrAppsFailedSubmitted(); @@ -530,35 +515,78 @@ public class FederationClientInterceptor subClusterId = federationFacade .getApplicationHomeSubCluster(request.getApplicationId()); } catch (YarnException e) { - routerMetrics.incrAppsFailedRetrieved(); - RouterServerUtil - .logAndThrowException("Application " + request.getApplicationId() - + " does not exist in FederationStateStore", e); + LOG.debug("Application " + request.getApplicationId() + + " does not exist in FederationStateStore"); } + GetApplicationReportResponse response = null; + if (subClusterId != null) { + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); - ApplicationClientProtocol clientRMProxy = - getClientRMProxyForSubCluster(subClusterId); + try { + response = clientRMProxy.getApplicationReport(request); + } catch (Exception e) { + routerMetrics.incrAppsFailedRetrieved(); + LOG.error("Unable to get the application report for " + + request.getApplicationId() + "to SubCluster " + + subClusterId.getId(), e); + throw e; + } - GetApplicationReportResponse response = null; - try { - response = clientRMProxy.getApplicationReport(request); - } catch (Exception e) { - routerMetrics.incrAppsFailedRetrieved(); - LOG.error("Unable to get the application report for " - + request.getApplicationId() + "to SubCluster " - + subClusterId.getId(), e); - throw e; - } + if (response == null) { + LOG.error("No response when attempting to retrieve the report of " + + "the application " + request.getApplicationId() + " to SubCluster " + + subClusterId.getId()); + } - if (response == null) { - LOG.error("No response when attempting to retrieve the report of " - + "the application " + request.getApplicationId() + " to SubCluster " - + subClusterId.getId()); - } + long stopTime = clock.getTime(); + routerMetrics.succeededAppsRetrieved(stopTime - startTime); + return response; + } else { + Map subClustersActive = + federationFacade.getSubClusters(true); + ExecutorService threadPool = Executors.newFixedThreadPool(subClustersActive.size()); + + CompletionService completionService = + new ExecutorCompletionService(threadPool); + + for (SubClusterId subCluster : subClustersActive.keySet()) { + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subCluster); + + completionService.submit(new Callable() { + @Override + public GetApplicationReportResponse call() throws Exception { + GetApplicationReportResponse response; + try { + response = clientRMProxy.getApplicationReport(request); + } catch (ApplicationNotFoundException e) { + return null; + } + return response; + } + }); + } - long stopTime = clock.getTime(); - routerMetrics.succeededAppsRetrieved(stopTime - startTime); - return response; + for (int i = 0; i < subClustersActive.size(); i++) { + try { + response = completionService.take().get(); + if (response != null) { + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + threadPool.shutdownNow(); + if (response == null) { + routerMetrics.incrAppsFailedRetrieved(); + RouterServerUtil + .logAndThrowException("Application " + request.getApplicationId() + + " does not exist in FederationStateStore and all subcluster", null); + } + return response; + } } @Override @@ -570,7 +598,21 @@ public class FederationClientInterceptor @Override public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + Map subClustersActive = + federationFacade.getSubClusters(true); + if (request.getClusterId() != null) { + String cluster = request.getClusterId(); + SubClusterId subClusterId = SubClusterId.newInstance(cluster); + if (subClustersActive.keySet().contains(subClusterId)) { + return getClientRMProxyForSubCluster(subClusterId).getClusterMetrics(request); + } + } + Map clusterMetrics = new TreeMap(); + for (SubClusterId subClusterId : subClustersActive.keySet()) { + clusterMetrics.put(subClusterId, + getClientRMProxyForSubCluster(subClusterId).getClusterMetrics(request)); + } + return RouterYarnClientUtils.merge(clusterMetrics.values()); } @Override @@ -674,7 +716,37 @@ public class FederationClientInterceptor @Override public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + try { + if (!isAllowedDelegationTokenOp()) { + throw new IOException( + "Delegation Token can be issued only with kerberos authentication"); + } + + String cluster = request.getCluster(); + if (cluster != null) { + Map subClustersActive = + federationFacade.getSubClusters(true); + if (subClustersActive == null || subClustersActive.size() < 1) { + RouterServerUtil.logAndThrowException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); + } + SubClusterId homeClusterId = SubClusterId.newInstance(cluster); + if (!subClustersActive.containsKey(homeClusterId)) + throw new YarnException("The HomeCluster " + cluster + + " does not exist in active SubClusters."); + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(homeClusterId); + GetDelegationTokenResponse response = clientRMProxy.getDelegationToken(request); + + return response; + } else { + String errMsg = "GetDelegationTokenRequest does not specify cluster"; + LOG.error(errMsg); + throw new YarnException(errMsg); + } + } catch(IOException io) { + throw RPCUtil.getRemoteException(io); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterMetricsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterMetricsRequest.java index fb11e18..2701603 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterMetricsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetClusterMetricsRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.util.Records; @@ -41,4 +42,13 @@ public abstract class GetClusterMetricsRequest { Records.newRecord(GetClusterMetricsRequest.class); return request; } + + @Public + @Unstable + public abstract String getClusterId(); + + @Public + @Unstable + public abstract void setClusterId(String clusterId); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterMetricsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterMetricsRequestPBImpl.java index 2288da8..a3717a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterMetricsRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetClusterMetricsRequestPBImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProtoOrBuilder; import com.google.protobuf.TextFormat; @@ -67,4 +68,30 @@ public class GetClusterMetricsRequestPBImpl extends GetClusterMetricsRequest { public String toString() { return TextFormat.shortDebugString(getProto()); } -} + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetClusterMetricsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getClusterId() { + GetClusterMetricsRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasClusterId()) { + return null; + } + return p.getClusterId(); + } + + @Override + public void setClusterId(String clusterId) { + maybeInitBuilder(); + if (clusterId == null) { + builder.clearClusterId(); + return; + } + builder.setClusterId(clusterId); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenRequest.java index 5268d8f..1b17cb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetDelegationTokenRequest.java @@ -47,4 +47,13 @@ public abstract class GetDelegationTokenRequest { @Public @Stable public abstract void setRenewer(String renewer); + + @Public + @Stable + public abstract String getCluster(); + + @Public + @Stable + public abstract void setCluster(String cluster); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenRequestPBImpl.java index 435b807..7e16e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetDelegationTokenRequestPBImpl.java @@ -30,6 +30,7 @@ import com.google.protobuf.TextFormat; public class GetDelegationTokenRequestPBImpl extends GetDelegationTokenRequest { String renewer; + String cluster; GetDelegationTokenRequestProto proto = GetDelegationTokenRequestProto.getDefaultInstance(); @@ -64,6 +65,23 @@ public class GetDelegationTokenRequestPBImpl extends GetDelegationTokenRequest { this.renewer = renewer; } + @Override + public String getCluster() { + GetDelegationTokenRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.cluster != null) + return this.cluster; + this.cluster = p.getCluster(); + return this.cluster; + } + + @Override + public void setCluster(String cluster) { + maybeInitBuilder(); + if (cluster == null) + builder.clearCluster(); + this.cluster = cluster; + } + public GetDelegationTokenRequestProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -95,6 +113,9 @@ public class GetDelegationTokenRequestPBImpl extends GetDelegationTokenRequest { if (renewer != null) { builder.setRenewer(this.renewer); } + if (cluster != null) { + builder.setCluster(this.cluster); + } } private void mergeLocalToProto() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 76050d0..4d8b590 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -82,7 +83,7 @@ public class Router extends CompositeService { } protected void doSecureLogin() throws IOException { - // TODO YARN-6539 Create SecureLogin inside Router + SecurityUtil.login(this.conf, YarnConfiguration.ROUTER_KEYTAB, YarnConfiguration.ROUTER_PRINCIPAL); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 73cc185..dc4ff56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -102,6 +103,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.router.security.DelegationTokenFetcher; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.router.security.ZKDelegationTokenFetcher; import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +130,8 @@ public class RouterClientRMService extends AbstractService private Server server; private InetSocketAddress listenerEndpoint; + private RouterDelegationTokenSecretManager routerDTSecretManager; + private DelegationTokenFetcher tokenFetcher; // For each user we store an interceptors' pipeline. // For performance issue we use LRU cache to keep in memory the newest ones @@ -137,12 +143,35 @@ public class RouterClientRMService extends AbstractService } @Override + protected void serviceInit(Configuration conf) throws Exception { + long secretKeyInterval = + conf.getLong(YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = + conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = + conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + this.routerDTSecretManager = + new RouterDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, + 3600000); + ZKCuratorManager zkManager = new ZKCuratorManager(conf); + zkManager.start(); + this.tokenFetcher = new ZKDelegationTokenFetcher(conf, zkManager, routerDTSecretManager); + super.serviceInit(conf); + } + + @Override protected void serviceStart() throws Exception { LOG.info("Starting Router ClientRMService"); Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); UserGroupInformation.setConfiguration(conf); + this.tokenFetcher.start(); + this.routerDTSecretManager.startThreads(); + this.listenerEndpoint = conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST, YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, @@ -163,7 +192,7 @@ public class RouterClientRMService extends AbstractService YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT); this.server = rpc.getServer(ApplicationClientProtocol.class, this, - listenerEndpoint, serverConf, null, numWorkerThreads); + listenerEndpoint, serverConf, this.routerDTSecretManager, numWorkerThreads); this.server.start(); LOG.info("Router ClientRMService listening on address: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java new file mode 100644 index 0000000..3f30cca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java @@ -0,0 +1,56 @@ +package org.apache.hadoop.yarn.server.router.security; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class RouterDelegationTokenSecretManager + extends AbstractDelegationTokenSecretManager { + + private static final Log LOG = LogFactory + .getLog(RouterDelegationTokenSecretManager.class); + + public RouterDelegationTokenSecretManager(long delegationKeyUpdateInterval, + long delegationTokenMaxLifetime, + long delegationTokenRenewInterval, + long delegationTokenRemoverScanInterval) { + super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, + delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + } + + @Override + public RMDelegationTokenIdentifier createIdentifier() { + return new RMDelegationTokenIdentifier(); + } + + @Override + public synchronized void addKey(DelegationKey key) throws IOException { + this.allKeys.put(key.getKeyId(), key); + } + + @Override + public synchronized void addPersistedDelegationToken(RMDelegationTokenIdentifier identifier, long renewDate) throws IOException { + int keyId = ((ExtendRMDelegationTokenIdentifier) identifier).getExtendMasterKeyId(); + DelegationKey dKey = (DelegationKey)this.allKeys.get(keyId); + if (dKey == null) { + LOG.warn("No KEY found for persisted identifier " + "(" + identifier + ")"); + } else { + byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); + if (identifier.getSequenceNumber() > this.getDelegationTokenSeqNum()) { + this.setDelegationTokenSeqNum(identifier.getSequenceNumber()); + } + this.currentTokens.put(identifier, new AbstractDelegationTokenSecretManager.DelegationTokenInformation(renewDate, password, this.getTrackingIdIfEnabled(identifier))); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java new file mode 100644 index 0000000..e5eb99d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.util.Collection; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; + +/** + * Util class for Router Yarn client API calls. + */ +public final class RouterYarnClientUtils { + + private RouterYarnClientUtils() { + } + + public static GetClusterMetricsResponse merge( + Collection responses) { + YarnClusterMetrics tmp = YarnClusterMetrics.newInstance(0); + for (GetClusterMetricsResponse response : responses) { + YarnClusterMetrics metrics = response.getClusterMetrics(); + tmp.setNumNodeManagers( + tmp.getNumNodeManagers() + metrics.getNumNodeManagers()); + tmp.setNumActiveNodeManagers( + tmp.getNumActiveNodeManagers() + metrics.getNumActiveNodeManagers()); + tmp.setNumDecommissionedNodeManagers( + tmp.getNumDecommissionedNodeManagers() + metrics + .getNumDecommissionedNodeManagers()); + tmp.setNumLostNodeManagers( + tmp.getNumLostNodeManagers() + metrics.getNumLostNodeManagers()); + tmp.setNumRebootedNodeManagers(tmp.getNumRebootedNodeManagers() + metrics + .getNumRebootedNodeManagers()); + tmp.setNumUnhealthyNodeManagers( + tmp.getNumUnhealthyNodeManagers() + metrics + .getNumUnhealthyNodeManagers()); + } + return GetClusterMetricsResponse.newInstance(tmp); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto index 037a878..521cbdc 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto @@ -51,6 +51,7 @@ message CredentialsProto { message GetDelegationTokenRequestProto { required string renewer = 1; + optional string cluster = 2 [default = ""]; } message GetDelegationTokenResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/SimpleAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/SimpleAMRMProxyPolicy.java new file mode 100644 index 0000000..dcbb919 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/SimpleAMRMProxyPolicy.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SimpleAMRMProxyPolicy extends AbstractAMRMProxyPolicy{ + + public static final Logger LOG = + LoggerFactory.getLogger(SimpleAMRMProxyPolicy.class); + + private SubClusterId homeSubcluster; + + @Override + public void reinitialize( + FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + if (policyContext.getHomeSubcluster() == null) { + throw new FederationPolicyInitializationException("The homeSubcluster " + + "filed in the context must be initialized to use this policy"); + } + this.homeSubcluster = policyContext.getHomeSubcluster(); + setPolicyContext(policyContext); + } + + @Override + public Map> splitResourceRequests( + List resourceRequests) throws YarnException { + Map> answer = new HashMap<>(); + + Map activeSubclusters = + getActiveSubclusters(); + if (!activeSubclusters.keySet().contains(homeSubcluster)) { + LOG.info("The homeSubCluster (" + homeSubcluster + ") we are " + + "defaulting to is not active, the ResourceRequest " + + "will be ignored."); + return answer; + } + answer.put(homeSubcluster,resourceRequests); + return answer; + } + + @Override + public void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/SimplePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/SimplePolicyManager.java new file mode 100644 index 0000000..2b6e2ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/SimplePolicyManager.java @@ -0,0 +1,10 @@ +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.SimpleAMRMProxyPolicy; + +public class SimplePolicyManager extends AbstractPolicyManager { + + public SimplePolicyManager() { + amrmProxyFederationPolicy = SimpleAMRMProxyPolicy.class; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 3f4a110..93ece8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -360,8 +360,14 @@ public class UnmanagedApplicationManager { protected Token initializeUnmanagedAM( ApplicationId appId) throws IOException, YarnException { try { - UserGroupInformation appSubmitter = - UserGroupInformation.createRemoteUser(this.submitter); + UserGroupInformation appSubmitter; + if (UserGroupInformation.isSecurityEnabled()) { + appSubmitter = + UserGroupInformation.createProxyUser(this.submitter, UserGroupInformation.getLoginUser()); + } else { + appSubmitter = + UserGroupInformation.createRemoteUser(this.submitter); + } this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, appSubmitter, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 1ceb462..616a2b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -153,6 +153,7 @@ public class YarnClientImpl extends YarnClient { private boolean timelineV1ServiceEnabled; protected boolean timelineServiceBestEffort; private boolean loadResourceTypesFromServer; + private String jobHomeCluster; private static final String ROOT = "root"; @@ -203,6 +204,8 @@ public class YarnClientImpl extends YarnClient { YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, YarnConfiguration.DEFAULT_YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER); + jobHomeCluster = conf.get(YarnConfiguration.JOB_SUBMIT_HOME_CLUSTER); + super.serviceInit(conf); } @@ -272,6 +275,11 @@ public class YarnClientImpl extends YarnClient { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } + + if (jobHomeCluster != null && !jobHomeCluster.isEmpty()) { + appContext.setHomeCluster(jobHomeCluster); + } + SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); @@ -622,6 +630,9 @@ public class YarnClientImpl extends YarnClient { GetDelegationTokenRequest rmDTRequest = Records.newRecord(GetDelegationTokenRequest.class); rmDTRequest.setRenewer(renewer.toString()); + if (jobHomeCluster != null && !jobHomeCluster.isEmpty()) { + rmDTRequest.setCluster(jobHomeCluster); + } GetDelegationTokenResponse response = rmClient.getDelegationToken(rmDTRequest); return response.getRMDelegationToken(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 20b4a6c..7e459e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3200,6 +3200,10 @@ public class YarnConfiguration extends Configuration { public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; + public static final String ROUTER_KEYTAB = ROUTER_PREFIX + "keytab"; + + public static final String ROUTER_PRINCIPAL = ROUTER_PREFIX + "principal"; + public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host"; public static final String ROUTER_CLIENTRM_PREFIX = @@ -3286,6 +3290,9 @@ public class YarnConfiguration extends Configuration { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; + public static final String JOB_SUBMIT_HOME_CLUSTER = + "job.submit.home-cluster"; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ZKDelegationTokenFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ZKDelegationTokenFetcher.java new file mode 100644 index 0000000..d81c86d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/ZKDelegationTokenFetcher.java @@ -0,0 +1,211 @@ +package org.apache.hadoop.yarn.server.router.security; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.curator.ZKCuratorManager; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreUtils; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; + +public class ZKDelegationTokenFetcher extends DelegationTokenFetcher { + + private static final Log LOG = LogFactory.getLog(ZKDelegationTokenFetcher.class); + + protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot"; + + protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_"; + protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; + + + private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = + "RMDelegationTokensRoot"; + private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = + "RMDTSequentialNumber"; + private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = + "RMDTMasterKeysRoot"; + public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; + + private ZKCuratorManager zkManager; + private String rootPath; + + public ZKDelegationTokenFetcher(Configuration conf, ZKCuratorManager zkcuratorManager, + RouterDelegationTokenSecretManager secretManager) throws Exception { + super(secretManager); + this.zkManager = zkcuratorManager; + this.rootPath = conf.get("yarn.resourcemanager.zk-state-store.rootpath", "/federation"); + } + + @Override + public void start() throws Exception { + PathChildrenCache subClusterCache = new PathChildrenCache(zkManager.getCurator(), rootPath, true); + subClusterCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + subClusterCache.getListenable().addListener(new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + case CHILD_REMOVED: + break; + } + } + }); + + for (ChildData data : subClusterCache.getCurrentData()) { + processSubcluster(data.getPath(), data.getData()); + } + } + + private void processSubcluster(String path, byte[] data) throws Exception { + LOG.info("Monitor Subcluster path: " + path); + String rootPath = path + "/" + ROOT_ZNODE_NAME + "/" + RM_DT_SECRET_MANAGER_ROOT; + zkManager.createRootDirRecursively(rootPath); + monitorMasterKey(rootPath + "/" + RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME); + monitorDelegationToken(rootPath + "/" + RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME); + } + + private void monitorDelegationToken(String path) throws Exception { + if (!zkManager.exists(path)) + zkManager.create(path); + LOG.info("Monitor DelegationToken path: " + path); + PathChildrenCache delegationTokenCache = new PathChildrenCache(zkManager.getCurator(), path,true); + delegationTokenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + + PathChildrenCacheListener listener = new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + LOG.info("Path: " + event.getData().getPath() + " Type: " + event.getType()); + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + processDTNode(event.getData().getPath(), event.getData().getData(), true); + break; + case CHILD_REMOVED: + processDTNode(event.getData().getPath(), event.getData().getData(), false); + break; + default: + break; + } + } + }; + delegationTokenCache.getListenable().addListener(listener); + for (ChildData data : delegationTokenCache.getCurrentData()) { + processDTNode(data.getPath(), data.getData(), true); + } + } + + private void monitorMasterKey(String path) throws Exception { + if (!zkManager.exists(path)) + zkManager.create(path); + LOG.info("Monitor MasterKey path: " + path); + PathChildrenCache masterKeyCache = new PathChildrenCache(zkManager.getCurator(), path, true); + masterKeyCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + PathChildrenCacheListener listener = new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + LOG.info("Path: " + event.getData().getPath() + " Type: " + event.getType()); + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + processKeyNode(event.getData().getPath(), event.getData().getData(), true); + break; + case CHILD_REMOVED: + break; + default: + break; + } + } + }; + masterKeyCache.getListenable().addListener(listener); + for (ChildData data : masterKeyCache.getCurrentData()) { + processKeyNode(data.getPath(), data.getData(), true); + } + } + + private void processKeyNode(String path, byte[] data, boolean isUpdate) throws Exception { + if (!getChildName(path).startsWith(DELEGATION_KEY_PREFIX)) { + LOG.info("path: " + path + " is not start with " + DELEGATION_KEY_PREFIX); + return; + } + + if (data == null) { + LOG.warn("Content of " + path + " is broken."); + } else { + String cluster = getClusterName(path); + ByteArrayInputStream is = new ByteArrayInputStream(data); + try (DataInputStream fsIn = new DataInputStream(is)) { + DelegationKey key = new DelegationKey(); + key.readFields(fsIn); + if (isUpdate) { + ExtendDelegationKey ekey = new ExtendDelegationKey(cluster, key); + updateMasterKey(ekey); + if (LOG.isInfoEnabled()) { + LOG.info("Loaded delegation key: keyId=" + key.getKeyId() + + ", expirationDate=" + key.getExpiryDate()); + } + } + } + } + } + + private void processDTNode(String path, byte[] data, boolean isUpdate) throws Exception { + if (!getChildName(path).startsWith(DELEGATION_TOKEN_PREFIX)) { + LOG.info("path: " + path + " is not start with " + DELEGATION_TOKEN_PREFIX); + return; + } + + if (data == null) { + LOG.warn("Content of " + path + " is broken."); + } else { + String cluster = getClusterName(path); + ByteArrayInputStream is = new ByteArrayInputStream(data); + try (DataInputStream fsIn = new DataInputStream(is)) { + RMDelegationTokenIdentifierData identifierData = + RMStateStoreUtils.readRMDelegationTokenIdentifierData(fsIn); + RMDelegationTokenIdentifier identifier = + identifierData.getTokenIdentifier(); + long renewDate = identifierData.getRenewDate(); + if (isUpdate) { + ExtendRMDelegationTokenIdentifier eIdentifier = new ExtendRMDelegationTokenIdentifier(cluster, identifier); + updateToken(eIdentifier, renewDate); + if (LOG.isInfoEnabled()) { + LOG.info("Loaded RMDelegationTokenIdentifier: " + identifier + + " renewDate=" + renewDate); + } + } else { + Token fakeToken = new Token(identifier.getBytes(), null, null, null); + removeToken(fakeToken, identifier.getUser().getUserName()); + if (LOG.isInfoEnabled()) { + LOG.info("Removed RMDelegationTokenIdentifier: " + identifier + + " renewDate=" + renewDate); + } + } + } + } + } + + private String getChildName(String path) { + int index = path.lastIndexOf("/"); + if (index == -1) + return path; + else + return path.substring(index + 1); + } + + private String getClusterName(String path) { + if (path.startsWith(rootPath)) { + String subPath = path.substring(rootPath.length() + 1); + int index = subPath.indexOf("/"); + return subPath.substring(0, index); + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 5e200dc..c53245a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -479,6 +479,7 @@ message ApplicationSubmissionContextProto { repeated ResourceRequestProto am_container_resource_request = 17; repeated ApplicationTimeoutMapProto application_timeouts = 18; repeated StringStringMapProto application_scheduling_properties = 19; + optional string homeCluster = 20 [default = ""]; } enum ApplicationTimeoutTypeProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 92a65ad..9ca1635 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -172,6 +172,7 @@ message KillApplicationResponseProto { } message GetClusterMetricsRequestProto { + optional string cluster_id = 1; } message GetClusterMetricsResponseProto {