diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 32bbe96..9eb9b83 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -1013,4 +1013,10 @@
* A comma-separated list of properties whose value will be redacted.
*/
String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
+
+ String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf";
+
+ String MR_JOB_SEND_TOKEN_CONF_DEFAULT =
+ "dfs.nameservices|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$|^dfs.client.failover.proxy.provider.*$";
+
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1b4f373..8a223e7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1989,4 +1989,19 @@
mapreduce.job.redacted-properties
+
+
+
+ This configuration is a regex expression. The list of configurations that
+ match the regex expression will be sent to RM. RM will use these
+ configurations for renewing tokens.
+ This configuration is added for below scenario: User needs to run distcp
+ jobs across two clusters, but the RM does not have necessary hdfs
+ configurations to connect to the remote hdfs cluster. Hence, user rely on
+ this config to send the list configurations to RM and RM uses these configs
+ to renew tokens.
+
+ mapreduce.job.send-token-conf
+
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 4c6f0f3..2d90dda 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -499,6 +499,13 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
ContainerLaunchContext.newInstance(localResources, environment,
vargsFinal, null, securityTokens, acls);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
+ if (regex != null && !regex.isEmpty()) {
+ setAppConf(amContainer, conf, regex);
+ }
+ }
+
Collection tagsFromConf =
jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);
@@ -576,6 +583,28 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
return appContext;
}
+ private void setAppConf(ContainerLaunchContext context,
+ Configuration conf, String regex) throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ Configuration copy = new Configuration(false);
+ copy.clear();
+ int count = 0;
+ for (Map.Entry map : conf) {
+ String key = map.getKey();
+ String val = map.getValue();
+ if (key.matches(regex)) {
+ copy.set(key, val);
+ count++;
+ }
+ }
+ copy.write(dob);
+ ByteBuffer appConf = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ LOG.info("Send configurations that match regex expression: " + regex
+ + " , total number of configs: " + count + ", total size : " + dob
+ .getLength() + " bytes.");
+ context.setTokensConf(appConf);
+ }
+
@Override
public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index 0e17ac8..289bf98 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -40,6 +40,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,6 +48,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -99,6 +101,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
@@ -106,6 +109,7 @@
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -675,4 +679,37 @@ private ApplicationSubmissionContext buildSubmitContext(
return yarnRunner.createApplicationSubmissionContext(jobConf,
testWorkDir.toString(), new Credentials());
}
+
+ // Test configs that match regex expression should be set in
+ // containerLaunchContext
+ @Test
+ public void testSendJobConf() throws IOException {
+ JobConf jobConf = new JobConf();
+ jobConf.set("dfs.nameservices", "mycluster1,mycluster2");
+ jobConf.set("dfs.namenode.rpc-address.mycluster2.nn1", "123.0.0.1");
+ jobConf.set("dfs.namenode.rpc-address.mycluster2.nn2", "123.0.0.2");
+ jobConf.set("dfs.ha.namenodes.mycluster2", "nn1,nn2");
+ jobConf.set("dfs.client.failover.proxy.provider.mycluster2", "provider");
+ jobConf.set("hadoop.tmp.dir", "testconfdir");
+ jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ jobConf.set("mapreduce.job.send-token-conf",
+ MRJobConfig.MR_JOB_SEND_TOKEN_CONF_DEFAULT);
+ UserGroupInformation.setConfiguration(jobConf);
+
+ YARNRunner yarnRunner = new YARNRunner(jobConf);
+ ApplicationSubmissionContext submissionContext =
+ buildSubmitContext(yarnRunner, jobConf);
+ Configuration confSent = BuilderUtils.parseTokensConf(submissionContext);
+
+ // configs that match regex should be included
+ Assert.assertTrue(confSent.get("dfs.namenode.rpc-address.mycluster2.nn1")
+ .equals("123.0.0.1"));
+ Assert.assertTrue(confSent.get("dfs.namenode.rpc-address.mycluster2.nn2")
+ .equals("123.0.0.2"));
+
+ // configs that aren't matching regex should not be included
+ Assert.assertTrue(confSent.get("hadoop.tmp.dir") == null || !confSent
+ .get("hadoop.tmp.dir").equals("testconfdir"));
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
index 6d4bccd..616aa4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
@@ -108,6 +108,22 @@ public static ContainerLaunchContext newInstance(
public abstract void setTokens(ByteBuffer tokens);
/**
+ * Get the configuration used by RM to renew tokens.
+ * @return The configuration used by RM to renew the tokens.
+ */
+ @Public
+ @Unstable
+ public abstract ByteBuffer getTokensConf();
+
+ /**
+ * Set the configuration used by RM to renew the tokens.
+ * @param tokensConf The configuration used by RM to renew the tokens
+ */
+ @Public
+ @Unstable
+ public abstract void setTokensConf(ByteBuffer tokensConf);
+
+ /**
* Get LocalResource required by the container.
* @return all LocalResource required by the container
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 5a70298..c805261 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -547,6 +547,8 @@ message ContainerLaunchContextProto {
repeated string command = 5;
repeated ApplicationACLMapProto application_ACLs = 6;
optional ContainerRetryContextProto container_retry_context = 7;
+ optional bytes tokens_conf = 8;
+
}
message ContainerStatusProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
index 1efe541..1f76c34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
@@ -54,6 +54,7 @@
private Map localResources = null;
private ByteBuffer tokens = null;
+ private ByteBuffer tokensConf = null;
private Map serviceData = null;
private Map environment = null;
private List commands = null;
@@ -111,6 +112,9 @@ private void mergeLocalToBuilder() {
if (this.tokens != null) {
builder.setTokens(convertToProtoFormat(this.tokens));
}
+ if (this.tokensConf != null) {
+ builder.setTokensConf(convertToProtoFormat(this.tokensConf));
+ }
if (this.serviceData != null) {
addServiceDataToProto();
}
@@ -268,6 +272,28 @@ public void setTokens(ByteBuffer tokens) {
}
@Override
+ public ByteBuffer getTokensConf() {
+ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.tokensConf != null) {
+ return this.tokensConf;
+ }
+ if (!p.hasTokensConf()) {
+ return null;
+ }
+ this.tokensConf = convertFromProtoFormat(p.getTokensConf());
+ return this.tokensConf;
+ }
+
+ @Override
+ public void setTokensConf(ByteBuffer tokensConf) {
+ maybeInitBuilder();
+ if (tokensConf == null) {
+ builder.clearTokensConf();
+ }
+ this.tokensConf = tokensConf;
+ }
+
+ @Override
public Map getServiceData() {
initServiceData();
return this.serviceData;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 74c06ff..ee443de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -29,8 +29,11 @@
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
@@ -496,4 +499,31 @@ public static AllocateResponse newAllocateResponse(int responseId,
return response;
}
+
+ public static Credentials parseCredentials(
+ ApplicationSubmissionContext application) throws IOException {
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ ByteBuffer tokens = application.getAMContainerSpec().getTokens();
+ if (tokens != null) {
+ dibb.reset(tokens);
+ credentials.readTokenStorageStream(dibb);
+ tokens.rewind();
+ }
+ return credentials;
+ }
+
+ public static Configuration parseTokensConf(
+ ApplicationSubmissionContext context) throws IOException {
+ ByteBuffer tokensConf = context.getAMContainerSpec().getTokensConf();
+ if (tokensConf == null) {
+ return null;
+ }
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(tokensConf);
+ Configuration appConf = new Configuration();
+ appConf.readFields(dibb);
+ tokensConf.rewind();
+ return appConf;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 3dc7e38..85fb7bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -607,6 +607,15 @@ public SubmitApplicationResponse submitApplication(
}
}
+ if (!UserGroupInformation.isSecurityEnabled() &&
+ request.getApplicationSubmissionContext().getAMContainerSpec()
+ .getTokensConf() != null) {
+ throw new YarnException(
+ "Token conf does not need to be set in non-secure environment for "
+ + submissionContext.getApplicationId());
+
+ }
+
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 4d628ee..8579d69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -17,18 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -297,14 +293,14 @@ protected void submitApplication(
// constructor.
RMAppImpl application = createAndPopulateNewRMApp(
submissionContext, submitTime, user, false, -1);
- Credentials credentials = null;
try {
- credentials = parseCredentials(submissionContext);
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer()
- .addApplicationAsync(applicationId, credentials,
+ .addApplicationAsync(applicationId,
+ BuilderUtils.parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
- application.getUser());
+ application.getUser(),
+ BuilderUtils.parseTokensConf(submissionContext));
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
@@ -466,20 +462,7 @@ private ResourceRequest validateAndCreateResourceRequest(
return null;
}
-
- protected Credentials parseCredentials(
- ApplicationSubmissionContext application) throws IOException {
- Credentials credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- ByteBuffer tokens = application.getAMContainerSpec().getTokens();
- if (tokens != null) {
- dibb.reset(tokens);
- credentials.readTokenStorageStream(dibb);
- tokens.rewind();
- }
- return credentials;
- }
-
+
@Override
public void recover(RMState state) throws Exception {
RMStateStore store = rmContext.getStateStore();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index a647969..08a2594 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1103,9 +1103,12 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
try {
app.rmContext.getDelegationTokenRenewer()
.addApplicationAsyncDuringRecovery(app.getApplicationId(),
- app.parseCredentials(),
+ BuilderUtils.parseCredentials(app.submissionContext),
app.submissionContext.getCancelTokensWhenComplete(),
- app.getUser());
+ app.getUser(),
+ BuilderUtils.parseTokensConf(app.submissionContext));
+ // set the memory free
+ app.submissionContext.getAMContainerSpec().setTokensConf(null);
} catch (Exception e) {
String msg = "Failed to fetch user credentials from application:"
+ e.getMessage();
@@ -1158,6 +1161,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
app.submissionContext, false, app.applicationPriority));
// send the ATS create Event
app.sendATSCreateEvent();
+ // Set the memory free after submission context is persisted
+ app.submissionContext.getAMContainerSpec().setTokensConf(null);
}
}
@@ -1473,6 +1478,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
.applicationFinished(app, finalState);
app.rmContext.getSystemMetricsPublisher()
.appFinished(app, finalState, app.finishTime);
+ // set the memory free
+ app.submissionContext.getAMContainerSpec().setTokensConf(null);
};
}
@@ -1682,18 +1689,6 @@ public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
- protected Credentials parseCredentials() throws IOException {
- Credentials credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- ByteBuffer tokens = submissionContext.getAMContainerSpec().getTokens();
- if (tokens != null) {
- dibb.reset(tokens);
- credentials.readTokenStorageStream(dibb);
- tokens.rewind();
- }
- return credentials;
- }
-
@Override
public Map getLogAggregationReportsForApp() {
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index dfbf333..35934de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -379,43 +379,43 @@ public Void run() throws Exception {
* @param applicationId added application
* @param ts tokens
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
- * done else false.
+ * done else false.
* @param user user
+ * @param appConf appConf sent by the app-submitter
*/
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
- boolean shouldCancelAtEnd, String user) {
+ boolean shouldCancelAtEnd, String user, Configuration appConf) {
processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
- applicationId, ts, shouldCancelAtEnd, user));
+ applicationId, ts, shouldCancelAtEnd, user, appConf));
}
/**
* Asynchronously add application tokens for renewal.
- *
- * @param applicationId
+ * @param applicationId
* added application
* @param ts
* tokens
* @param shouldCancelAtEnd
- * true if tokens should be canceled when the app is done else false.
- * @param user
- * user
+ * true if tokens should be canceled when the app is done else false.
+ * @param user user
+ * @param appConf appConf sent by the app-submitter
*/
public void addApplicationAsyncDuringRecovery(ApplicationId applicationId,
- Credentials ts, boolean shouldCancelAtEnd, String user) {
+ Credentials ts, boolean shouldCancelAtEnd, String user,
+ Configuration appConf) {
processDelegationTokenRenewerEvent(
new DelegationTokenRenewerAppRecoverEvent(applicationId, ts,
- shouldCancelAtEnd, user));
+ shouldCancelAtEnd, user, appConf));
}
- /**
- * Synchronously renew delegation tokens.
- * @param user user
- */
+
+ // Only for testing
+ // Synchronously renew delegation tokens.
public void addApplicationSync(ApplicationId applicationId, Credentials ts,
boolean shouldCancelAtEnd, String user) throws IOException,
InterruptedException {
handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
- applicationId, ts, shouldCancelAtEnd, user));
+ applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
}
private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
@@ -455,8 +455,19 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr == null) {
+ Configuration appConf;
+ if (evt.appConf != null) {
+ appConf = new Configuration(getConfig());
+ // Override conf with app provided conf - this is required in cases
+ // where RM does not have the required conf to communicate with
+ // remote hdfs cluster. The conf is provided by the application
+ // itself.
+ appConf.addResource(evt.appConf);
+ } else {
+ appConf = getConfig();
+ }
dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
- getConfig(), now, shouldCancelAtEnd, evt.getUser());
+ appConf, now, shouldCancelAtEnd, evt.getUser());
try {
renewToken(dttr);
} catch (IOException ioe) {
@@ -926,22 +937,22 @@ private void handleDTRenewerAppRecoverEvent(
}
static class DelegationTokenRenewerAppSubmitEvent
- extends
- AbstractDelegationTokenRenewerAppEvent {
+ extends AbstractDelegationTokenRenewerAppEvent {
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
- Credentials credentails, boolean shouldCancelAtEnd, String user) {
+ Credentials credentails, boolean shouldCancelAtEnd, String user,
+ Configuration appConf) {
super(appId, credentails, shouldCancelAtEnd, user,
- DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+ DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION, appConf);
}
}
static class DelegationTokenRenewerAppRecoverEvent
- extends
- AbstractDelegationTokenRenewerAppEvent {
+ extends AbstractDelegationTokenRenewerAppEvent {
public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
- Credentials credentails, boolean shouldCancelAtEnd, String user) {
+ Credentials credentails, boolean shouldCancelAtEnd, String user,
+ Configuration appConf) {
super(appId, credentails, shouldCancelAtEnd, user,
- DelegationTokenRenewerEventType.RECOVER_APPLICATION);
+ DelegationTokenRenewerEventType.RECOVER_APPLICATION, appConf);
}
}
@@ -949,16 +960,18 @@ public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
DelegationTokenRenewerEvent {
private Credentials credentials;
+ private Configuration appConf;
private boolean shouldCancelAtEnd;
private String user;
public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId,
- Credentials credentails, boolean shouldCancelAtEnd, String user,
- DelegationTokenRenewerEventType type) {
+ Credentials credentials, boolean shouldCancelAtEnd, String user,
+ DelegationTokenRenewerEventType type, Configuration appConf) {
super(appId, type);
- this.credentials = credentails;
+ this.credentials = credentials;
this.shouldCancelAtEnd = shouldCancelAtEnd;
this.user = user;
+ this.appConf = appConf;
}
public Credentials getCredentials() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 9223ef3..61e4437 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -110,6 +110,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -1732,26 +1733,16 @@ public void testAppFailedOnSubmissionSavedInStateStore() throws Exception {
memStore.init(conf);
MockRM rm1 = new TestSecurityMockRM(conf, memStore) {
- @Override
- protected RMAppManager createRMAppManager() {
- return new TestRMAppManager(this.rmContext, this.scheduler,
- this.masterService, this.applicationACLsManager, conf);
- }
-
- class TestRMAppManager extends RMAppManager {
-
- public TestRMAppManager(RMContext context, YarnScheduler scheduler,
- ApplicationMasterService masterService,
- ApplicationACLsManager applicationACLsManager, Configuration conf) {
- super(context, scheduler, masterService, applicationACLsManager, conf);
- }
-
- @Override
- protected Credentials parseCredentials(
- ApplicationSubmissionContext application) throws IOException {
- throw new IOException("Parsing credential error.");
+ class TestDelegationTokenRenewer extends DelegationTokenRenewer {
+ public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
+ boolean shouldCancelAtEnd, String user, Configuration appConf) {
+ throw new RuntimeException("failed to submit app");
}
}
+ @Override
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
+ return new TestDelegationTokenRenewer();
+ }
};
rm1.start();
RMApp app1 =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 205188b..4487e1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -392,7 +392,8 @@ public void testDTRenewal () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 =
BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
+ delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user",
+ new Configuration());
waitForEventsToGetProcessed(delegationTokenRenewer);
// first 3 initial renewals + 1 real
@@ -432,7 +433,8 @@ public void testDTRenewal () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
+ delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user",
+ new Configuration());
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -468,7 +470,8 @@ public void testAppRejectionWithCancelledDelegationToken() throws Exception {
// register the tokens for renewal
ApplicationId appId = BuilderUtils.newApplicationId(0, 0);
- delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
+ delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user",
+ new Configuration());
int waitCnt = 20;
while (waitCnt-- >0) {
if (!eventQueue.isEmpty()) {
@@ -531,7 +534,8 @@ public void testDTRenewalWithNoCancel () throws Exception {
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
- delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
+ delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user",
+ new Configuration());
waitForEventsToGetProcessed(delegationTokenRenewer);
delegationTokenRenewer.applicationFinished(applicationId_1);
waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -600,7 +604,8 @@ public void testDTKeepAlive1 () throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
+ localDtr.addApplicationAsync(applicationId_0, ts, true, "user",
+ new Configuration());
waitForEventsToGetProcessed(localDtr);
if (!eventQueue.isEmpty()){
Event evt = eventQueue.take();
@@ -679,7 +684,8 @@ public void testDTKeepAlive2() throws Exception {
// register the tokens for renewal
ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
- localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
+ localDtr.addApplicationAsync(applicationId_0, ts, true, "user",
+ new Configuration());
localDtr.applicationFinished(applicationId_0);
waitForEventsToGetProcessed(delegationTokenRenewer);
//Send another keep alive.
@@ -831,14 +837,16 @@ public Long answer(InvocationOnMock invocation)
Thread submitThread = new Thread() {
@Override
public void run() {
- dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
+ dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user",
+ new Configuration());
}
};
submitThread.start();
// wait till 1st submit blocks, then submit another
startBarrier.await();
- dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
+ dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user",
+ new Configuration());
// signal 1st to complete
endBarrier.await();
submitThread.join();