diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 723a2e0..ccf3b6b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -54,6 +54,7 @@
* validityInterval into failure count. If failure count reaches to
* maxAppAttempts, the application will be failed.
*
+ *
Optional, application-specific {@link LogContext}
*
*
*
@@ -128,6 +129,22 @@ public static ApplicationSubmissionContext newInstance(
return context;
}
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers, LogContext logContext) {
+ ApplicationSubmissionContext context =
+ newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers);
+ context.setLogContext(logContext);
+ return context;
+ }
+
/**
* Get the ApplicationId of the submitted application.
* @return ApplicationId of the submitted application
@@ -381,4 +398,23 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
@Stable
public abstract void setAttemptFailuresValidityInterval(
long attemptFailuresValidityInterval);
+
+ /**
+ * Get LogContext of the application
+ *
+ * @return LogContext of the application
+ */
+ @Public
+ @Stable
+ public abstract LogContext getLogContext();
+
+ /**
+ * Set LogContext for the application
+ *
+ * @param logContext
+ * for the application
+ */
+ @Public
+ @Stable
+ public abstract void setLogContext(LogContext logContext);
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
new file mode 100644
index 0000000..85bcaba
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogContext.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * LogContext represents all of the
+ * information needed by the NodeManager to handle
+ * the logs for an application.
+ *
+ * It includes details such as:
+ *
+ * - logIncludePatterns. It defines include patterns which is used to
+ * filter the log files. The log files which match the include
+ * patterns will be uploaded.
+ * - logExcludePatterns. It defines exclude patterns which is used to
+ * filter the log files. The log files which match the include
+ * patterns will not be uploaded. if the log file name matches both the
+ * include and the exclude patterns, the file will be excluded eventually
+ * - logInterval. The default value is 0. By default,
+ * the logAggregationService only uploads container logs when
+ * the application is finished. This configure defines
+ * how often the logAggregationSerivce uploads container logs in seconds
+ * By setting this configure, the logAggregationSerivce can upload container
+ * logs periodically when the application is running.
+ *
+ *
+ *
+ *
+ * @see ContainerLaunchContext
+ */
+public abstract class LogContext {
+
+ @Public
+ @Stable
+ public static LogContext newInstance(Set logIncludePatterns,
+ Set logExcludePatterns, long logInterval) {
+ Preconditions.checkArgument(logInterval >= 0);
+ LogContext context = Records.newRecord(LogContext.class);
+ context.setLogIncludePatterns(logIncludePatterns);
+ context.setLogExcludePatterns(logExcludePatterns);
+ context.setLogInterval(logInterval);
+ return context;
+ }
+
+ /**
+ * Get include patterns
+ * @return set of include patterns
+ */
+ @Public
+ @Stable
+ public abstract Set getLogIncludePatterns();
+
+ /**
+ * Set include patterns
+ * @param includePatterns to set
+ */
+ @Public
+ @Stable
+ public abstract void setLogIncludePatterns(Set includePatterns);
+
+ /**
+ * Get exclude patterns
+ * @return set of exclude patterns
+ */
+ @Public
+ @Stable
+ public abstract Set getLogExcludePatterns();
+
+ /**
+ * Set exclude patterns
+ * @param excludePatterns to set
+ */
+ @Public
+ @Stable
+ public abstract void setLogExcludePatterns(Set excludePatterns);
+
+ /**
+ * Get LogInterval in seconds
+ * @return the logInterval
+ */
+ @Public
+ @Stable
+ public abstract long getLogInterval();
+
+ /**
+ * Set LogInterval in seconds
+ * @param logInterval
+ */
+ @Public
+ @Stable
+ public abstract void setLogInterval(long logInterval);
+
+ @Private
+ public abstract LogContextProto getProto();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d8c42cc..faa307e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -292,6 +292,13 @@ message ApplicationSubmissionContextProto {
optional bool keep_containers_across_application_attempts = 11 [default = false];
repeated string applicationTags = 12;
optional int64 attempt_failures_validity_interval = 13 [default = -1];
+ optional LogContextProto log_context = 14;
+}
+
+message LogContextProto {
+ repeated string log_include_pattern = 1;
+ repeated string log_exclude_pattern = 2;
+ optional int64 log_monitor_interval = 3 [default = 0];
}
enum ApplicationAccessTypeProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 7b49a16..d7a0314 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher;
+
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -31,6 +33,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -53,6 +56,7 @@
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private LogContext logContext = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@@ -110,6 +114,9 @@ private void mergeLocalToBuilder() {
builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.logContext != null) {
+ builder.setLogContext(convertToProtoFormat(this.logContext));
+ }
}
private void mergeLocalToProto() {
@@ -415,4 +422,33 @@ public void setAttemptFailuresValidityInterval(
maybeInitBuilder();
builder.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
+
+ private LogContextPBImpl convertFromProtoFormat(LogContextProto p) {
+ return new LogContextPBImpl(p);
+ }
+
+ private LogContextProto convertToProtoFormat(LogContext t) {
+ return ((LogContextPBImpl) t).getProto();
+ }
+
+ @Override
+ public LogContext getLogContext() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.logContext != null) {
+ return this.logContext;
+ } // Else via proto
+ if (!p.hasLogContext()) {
+ return null;
+ }
+ logContext = convertFromProtoFormat(p.getLogContext());
+ return logContext;
+ }
+
+ @Override
+ public void setLogContext(LogContext logContext) {
+ maybeInitBuilder();
+ if (logContext == null)
+ builder.clearLogContext();
+ this.logContext = logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
new file mode 100644
index 0000000..ceea8e2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogContextPBImpl.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.LogContext;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProtoOrBuilder;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogContextPBImpl extends LogContext {
+ LogContextProto proto = LogContextProto.getDefaultInstance();
+ LogContextProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Set logIncludePatterns = null;
+ private Set logExcludePatterns = null;
+
+ public LogContextPBImpl() {
+ builder = LogContextProto.newBuilder();
+ }
+
+ public LogContextPBImpl(LogContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LogContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.logIncludePatterns != null && !this.logIncludePatterns.isEmpty()) {
+ builder.clearLogIncludePattern();
+ builder.addAllLogIncludePattern(this.logIncludePatterns);
+ }
+
+ if (this.logExcludePatterns != null && !this.logExcludePatterns.isEmpty()) {
+ builder.clearLogExcludePattern();
+ builder.addAllLogExcludePattern(this.logExcludePatterns);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LogContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public Set getLogIncludePatterns() {
+ initLogIncludePatterns();
+ return this.logIncludePatterns;
+ }
+
+ @Override
+ public void setLogIncludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogIncludePattern();
+ this.logIncludePatterns = null;
+ return;
+ }
+
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(logPattern);
+ }
+
+ private void initLogIncludePatterns() {
+ if (this.logIncludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logIncludePatterns = new HashSet();
+ this.logIncludePatterns.addAll(p.getLogIncludePatternList());
+ }
+
+ @Override
+ public Set getLogExcludePatterns() {
+ initLogExcludePatterns();
+ return this.logExcludePatterns;
+ }
+
+ @Override
+ public void setLogExcludePatterns(Set logPattern) {
+ maybeInitBuilder();
+ if (logPattern == null || logPattern.isEmpty()) {
+ builder.clearLogExcludePattern();
+ this.logExcludePatterns = null;
+ return;
+ }
+
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(logPattern);
+ }
+
+ private void initLogExcludePatterns() {
+ if (this.logExcludePatterns != null) {
+ return;
+ }
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.logExcludePatterns = new HashSet();
+ this.logExcludePatterns.addAll(p.getLogExcludePatternList());
+ }
+
+ @Override
+ public long getLogInterval() {
+ LogContextProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getLogMonitorInterval();
+ }
+
+ @Override
+ public void setLogInterval(long logInterval) {
+ Preconditions.checkArgument(logInterval >= 0);
+ maybeInitBuilder();
+ builder.setLogMonitorInterval(logInterval);
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
index ca847e0..58d76a7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
@@ -34,8 +34,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.LogContextProto;
/**
* TokenIdentifier for a container. Encodes {@link ContainerId},
@@ -59,6 +62,7 @@
private long rmIdentifier;
private Priority priority;
private long creationTime;
+ private LogContext logContext;
public ContainerTokenIdentifier(ContainerId containerID,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
@@ -74,6 +78,22 @@ public ContainerTokenIdentifier(ContainerId containerID,
this.creationTime = creationTime;
}
+ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
+ String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
+ long rmIdentifier, Priority priority, long creationTime,
+ LogContext logContext) {
+ this.containerId = containerID;
+ this.nmHostAddr = hostName;
+ this.appSubmitter = appSubmitter;
+ this.resource = r;
+ this.expiryTimeStamp = expiryTimeStamp;
+ this.masterKeyId = masterKeyId;
+ this.rmIdentifier = rmIdentifier;
+ this.priority = priority;
+ this.creationTime = creationTime;
+ this.logContext = logContext;
+ }
+
/**
* Default constructor needed by RPC layer/SecretManager.
*/
@@ -111,6 +131,10 @@ public Priority getPriority() {
public long getCreationTime() {
return this.creationTime;
}
+
+ public LogContext getLogContext() {
+ return this.logContext;
+ }
/**
* Get the RMIdentifier of RM in which containers are allocated
* @return RMIdentifier
@@ -138,6 +162,13 @@ public void write(DataOutput out) throws IOException {
out.writeLong(this.rmIdentifier);
out.writeInt(this.priority.getPriority());
out.writeLong(this.creationTime);
+ if (this.logContext == null) {
+ out.writeInt(-1);
+ } else {
+ byte[] logContextByte = this.logContext.getProto().toByteArray();
+ out.writeInt(logContextByte.length);
+ out.write(logContextByte);
+ }
}
@Override
@@ -158,6 +189,17 @@ public void readFields(DataInput in) throws IOException {
this.rmIdentifier = in.readLong();
this.priority = Priority.newInstance(in.readInt());
this.creationTime = in.readLong();
+ int size = in.readInt();
+ if (size != -1) {
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+ LogContextPBImpl logContextData =
+ new LogContextPBImpl(LogContextProto.parseFrom(bytes));
+ this.logContext =
+ LogContext.newInstance(logContextData.getLogIncludePatterns(),
+ logContextData.getLogExcludePatterns(),
+ logContextData.getLogInterval());
+ }
}
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index c6572e9..caacef8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -178,6 +178,7 @@ public static void setup() throws Exception {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(LogContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
generateByNewInstance(ContainerId.class);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 12166e0..ccd2cfc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -72,9 +72,11 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -275,11 +277,16 @@ private void recoverApplication(ContainerManagerApplicationProto p)
aclProto.getAcl());
}
+ LogContext logContext = null;
+ if (p.getLogContext() != null) {
+ logContext = new LogContextPBImpl(p.getLogContext());
+ }
+
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context);
context.getApplications().put(appId, app);
- app.handle(new ApplicationInitEvent(appId, acls));
+ app.handle(new ApplicationInitEvent(appId, acls, logContext));
}
@SuppressWarnings("unchecked")
@@ -719,13 +726,17 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
- Map appAcls) {
+ Map appAcls, LogContext logContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user);
+ if (logContext != null) {
+ builder.setLogContext(((LogContextPBImpl) logContext).getProto());
+ }
+
builder.clearCredentials();
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
@@ -826,12 +837,14 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
+ LogContext logContext = containerTokenIdentifier.getLogContext();
Map appAcls =
container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID,
- buildAppProto(applicationID, user, credentials, appAcls));
+ buildAppProto(applicationID, user, credentials, appAcls,
+ logContext));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, appAcls));
+ new ApplicationInitEvent(applicationID, appAcls, logContext));
}
this.context.getNMStateStore().storeContainer(containerId, request);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index cc5544c..aa7f166 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -72,6 +73,8 @@
private static final Log LOG = LogFactory.getLog(Application.class);
+ private LogContext logContext;
+
Map containers =
new HashMap();
@@ -234,10 +237,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// Inform the logAggregator
+ app.logContext = initEvent.getLogContext();
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
- app.applicationACLs));
+ app.applicationACLs, app.logContext));
}
}
@@ -467,4 +471,13 @@ public void handle(ApplicationEvent event) {
public String toString() {
return appId.toString();
}
+
+ public LogContext getLogContext() {
+ try {
+ this.readLock.lock();
+ return this.logContext;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
index 5746b6a..fdd828f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
@@ -22,18 +22,32 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogContext;
public class ApplicationInitEvent extends ApplicationEvent {
private final Map applicationACLs;
+ private final LogContext logContext;
public ApplicationInitEvent(ApplicationId appId,
Map acls) {
super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls;
+ this.logContext = null;
+ }
+
+ public ApplicationInitEvent(ApplicationId appId,
+ Map acls, LogContext logContext) {
+ super(appId, ApplicationEventType.INIT_APPLICATION);
+ this.applicationACLs = acls;
+ this.logContext = logContext;
}
public Map getApplicationACLs() {
return this.applicationACLs;
}
+
+ public LogContext getLogContext() {
+ return this.logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
index 6c07674..28ae10a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
public class LogHandlerAppStartedEvent extends LogHandlerEvent {
@@ -32,6 +33,7 @@
private final String user;
private final Credentials credentials;
private final Map appAcls;
+ private final LogContext logContext;
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
@@ -42,8 +44,20 @@ public LogHandlerAppStartedEvent(ApplicationId appId, String user,
this.credentials = credentials;
this.retentionPolicy = retentionPolicy;
this.appAcls = appAcls;
+ this.logContext = null;
}
+ public LogHandlerAppStartedEvent(ApplicationId appId, String user,
+ Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
+ Map appAcls, LogContext logContext) {
+ super(LogHandlerEventType.APPLICATION_STARTED);
+ this.applicationId = appId;
+ this.user = user;
+ this.credentials = credentials;
+ this.retentionPolicy = retentionPolicy;
+ this.appAcls = appAcls;
+ this.logContext = logContext;
+ }
public ApplicationId getApplicationId() {
return this.applicationId;
}
@@ -64,4 +78,7 @@ public String getUser() {
return this.appAcls;
}
+ public LogContext getLogContext() {
+ return this.logContext;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index e6f39f6..37501c7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -29,6 +29,7 @@ message ContainerManagerApplicationProto {
optional string user = 2;
optional bytes credentials = 3;
repeated ApplicationACLMapProto acls = 4;
+ optional LogContextProto log_context = 5;
}
message DeletionServiceDeleteTaskProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index f2109b5..679faf6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -795,11 +796,20 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
throws IOException {
+ return createContainerToken(cId, rmIdentifier, nodeId, user,
+ containerTokenSecretManager, null);
+ }
+
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogContext logContext)
+ throws IOException {
Resource r = BuilderUtils.newResource(1024, 1);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
- Priority.newInstance(0), 0);
+ Priority.newInstance(0), 0, logContext);
Token containerToken =
BuilderUtils
.newContainerToken(nodeId, containerTokenSecretManager
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 0319664..3c98c7e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -27,8 +27,10 @@
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -45,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -58,6 +61,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@@ -72,6 +76,7 @@
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.Test;
+
public class TestContainerManagerRecovery {
private NodeManagerMetrics metrics = NodeManagerMetrics.create();
@@ -126,8 +131,16 @@ public void testApplicationRecovery() throws Exception {
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls);
+ // create a not-null logContext
+ String[] includePatterns = { "syslog", "stdout" };
+ String[] excludePatterns = { "stderr", "log", "token" };
+ final int interval = 2000;
+ LogContext logContext =
+ LogContext.newInstance(
+ new HashSet(Arrays.asList(includePatterns)),
+ new HashSet(Arrays.asList(excludePatterns)), interval);
StartContainersResponse startResponse = startContainer(context, cm, cid,
- clc);
+ clc, logContext);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
@@ -157,6 +170,14 @@ public void testApplicationRecovery() throws Exception {
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
+
+ // check whether logContext is recovered correctly
+ LogContext recovered = ((ApplicationImpl)app).getLogContext();
+ assertNotNull(recovered);
+ assertEquals(recovered.getLogInterval(), logContext.getLogInterval());
+ assertEquals(recovered.getLogIncludePatterns(), logContext.getLogIncludePatterns());
+ assertEquals(recovered.getLogExcludePatterns(), logContext.getLogExcludePatterns());
+
waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess(
UserGroupInformation.createRemoteUser(modUser),
@@ -224,13 +245,13 @@ public void testApplicationRecovery() throws Exception {
private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid,
- ContainerLaunchContext clc) throws Exception {
+ ContainerLaunchContext clc, LogContext logContext) throws Exception {
UserGroupInformation user = UserGroupInformation.createRemoteUser(
cid.getApplicationAttemptId().toString());
StartContainerRequest scReq = StartContainerRequest.newInstance(
clc, TestContainerManager.createContainerToken(cid, 0,
context.getNodeId(), user.getShortUserName(),
- context.getContainerTokenSecretManager()));
+ context.getContainerTokenSecretManager(), logContext));
final List scReqList =
new ArrayList();
scReqList.add(scReq);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 5240789..95ce659 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -91,6 +91,7 @@
private Resource amResource;
private boolean unmanagedAM = true;
private boolean amRunning = false;
+ private ApplicationSubmissionContext appSubmissionContext = null;
protected List newlyAllocatedContainers =
new ArrayList();
@@ -133,7 +134,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
- ApplicationSubmissionContext appSubmissionContext =
+ appSubmissionContext =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
.getApplicationSubmissionContext();
if (appSubmissionContext != null) {
@@ -444,7 +445,8 @@ public ContainersAndNMTokensAllocation(List containerList,
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource(), container.getPriority(),
- rmContainer.getCreationTime()));
+ rmContainer.getCreationTime(), this.appSubmissionContext == null ?
+ null : this.appSubmissionContext.getLogContext()));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
index 13943f8..ecc6b14 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -177,6 +178,25 @@ public void run() {
public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority,
long createTime) {
+ return createContainerToken(containerId, nodeId, appSubmitter, capability,
+ priority, createTime, null);
+ }
+
+ /**
+ * Helper function for creating ContainerTokens
+ *
+ * @param containerId
+ * @param nodeId
+ * @param appSubmitter
+ * @param capability
+ * @param priority
+ * @param createTime
+ * @param logContext
+ * @return the container-token
+ */
+ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
+ String appSubmitter, Resource capability, Priority priority,
+ long createTime, LogContext logContext) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@@ -189,7 +209,8 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
new ContainerTokenIdentifier(containerId, nodeId.toString(),
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
.getMasterKey().getKeyId(),
- ResourceManager.getClusterTimeStamp(), priority, createTime);
+ ResourceManager.getClusterTimeStamp(), priority, createTime,
+ logContext);
password = this.createPassword(tokenIdentifier);
} finally {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 1338a6c..cfbd522 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -278,7 +279,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- false, null, 0);
+ false, null, 0, null);
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
@@ -287,9 +288,17 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
- false, null, attemptFailuresValidityInterval);
+ false, null, attemptFailuresValidityInterval, null);
}
+ public RMApp submitApp(int masterMemory, LogContext logContext)
+ throws Exception {
+ return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null,
+ super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
+ false, null, 0, logContext);
+ }
public RMApp submitApp(int masterMemory, String name, String user,
Map acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
@@ -297,14 +306,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
- isAppIdProvided, applicationId, 0);
+ isAppIdProvided, applicationId, 0, null);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
- ApplicationId applicationId, long attemptFailuresValidityInterval)
+ ApplicationId applicationId, long attemptFailuresValidityInterval,
+ LogContext logContext)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@@ -342,6 +352,9 @@ public RMApp submitApp(int masterMemory, String name, String user,
}
sub.setAMContainerSpec(clc);
sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
+ if (logContext != null) {
+ sub.setLogContext(logContext);
+ }
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index a9bfc2f..19492c4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -28,12 +30,14 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LogContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -47,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -165,6 +170,63 @@ public void testContainerTokenGeneratedOnPullRequest() throws Exception {
rm1.stop();
}
+ // This is to test whether logContext is passed into
+ // container tokens correctly
+ @Test
+ public void testLogContextPassedIntoContainerToken() throws Exception {
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+ MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000);
+ // logContext is set as null
+ Assert.assertNull(getLogContextFromContainerToken(rm1, nm1, null));
+
+ // create a not-null logContext
+ String[] includePatterns = { "syslog", "stdout" };
+ String[] excludePatterns = { "stderr", "log", "token" };
+ final int interval = 2000;
+ LogContext logContext =
+ LogContext.newInstance(
+ new HashSet(Arrays.asList(includePatterns)),
+ new HashSet(Arrays.asList(excludePatterns)), interval);
+ LogContext returned = getLogContextFromContainerToken(rm1, nm2, logContext);
+ Assert
+ .assertTrue(returned.getLogIncludePatterns().size() == includePatterns.length);
+ for (String pattern : includePatterns) {
+ Assert.assertTrue(returned.getLogIncludePatterns().contains(pattern));
+ }
+ Assert
+ .assertTrue(returned.getLogExcludePatterns().size() == excludePatterns.length);
+ for (String pattern : excludePatterns) {
+ Assert.assertTrue(returned.getLogExcludePatterns().contains(pattern));
+ }
+ Assert.assertTrue(returned.getLogInterval() == interval);
+ rm1.stop();
+ }
+
+ private LogContext getLogContextFromContainerToken(MockRM rm1, MockNM nm1,
+ LogContext logContext) throws Exception {
+ RMApp app2 = rm1.submitApp(200, logContext);
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+ nm1.nodeHeartbeat(true);
+ // request a container.
+ am2.allocate("127.0.0.1", 512, 1, new ArrayList());
+ ContainerId containerId =
+ ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
+
+ // acquire the container.
+ List containers =
+ am2.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers();
+ Assert.assertEquals(containerId, containers.get(0).getId());
+ // container token is generated.
+ Assert.assertNotNull(containers.get(0).getContainerToken());
+ ContainerTokenIdentifier token =
+ BuilderUtils.newContainerTokenIdentifier(containers.get(0)
+ .getContainerToken());
+ return token.getLogContext();
+ }
@Test
public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
MockRM rm1 = new MockRM(conf);
@@ -210,10 +272,10 @@ protected RMContainerTokenSecretManager createContainerTokenSecretManager(
@Override
public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability,
- Priority priority, long createTime) {
+ Priority priority, long createTime, LogContext logContext) {
numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter,
- capability, priority, createTime);
+ capability, priority, createTime, logContext);
}
};
}