diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index ab283e7..926c757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -296,6 +296,8 @@ public static ReservationRequestInterpreter convertFromProtoFormat( * Log Aggregation Status */ private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_"; + private static final int LOG_AGGREGATION_STATUS_PREFIX_LEN = + LOG_AGGREGATION_STATUS_PREFIX.length(); public static LogAggregationStatusProto convertToProtoFormat( LogAggregationStatus e) { return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX @@ -304,8 +306,8 @@ public static LogAggregationStatusProto convertToProtoFormat( public static LogAggregationStatus convertFromProtoFormat( LogAggregationStatusProto e) { - return LogAggregationStatus.valueOf(e.name().replace( - LOG_AGGREGATION_STATUS_PREFIX, "")); + return LogAggregationStatus.valueOf(e.name().substring( + LOG_AGGREGATION_STATUS_PREFIX_LEN)); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9f00b2e..190b8a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -34,6 +34,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -177,7 +178,7 @@ private long logAggregationStartTime = 0; private final long logAggregationStatusTimeout; private final Map logAggregationStatus = - new HashMap(); + new ConcurrentHashMap(); private LogAggregationStatus logAggregationStatusForAppReport; private int logAggregationSucceed = 0; private int logAggregationFailed = 0; @@ -697,6 +698,8 @@ public int pullRMNodeUpdates(Collection updatedNodes) { public ApplicationReport createAndGetApplicationReport(String clientUserName, boolean allowAccess) { this.readLock.lock(); + LogAggregationStatus logAggregationStatusApp = null; + ApplicationReport report = null; try { ApplicationAttemptId currentApplicationAttemptId = null; @@ -704,7 +707,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, String trackingUrl = UNAVAILABLE; String host = UNAVAILABLE; String origTrackingUrl = UNAVAILABLE; - LogAggregationStatus logAggregationStatus = null; + int rpcPort = -1; ApplicationResourceUsageReport appUsageReport = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; @@ -735,7 +738,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, rpcPort = this.currentAttempt.getRpcPort(); appUsageReport = currentAttempt.getApplicationResourceUsageReport(); progress = currentAttempt.getProgress(); - logAggregationStatus = this.getLogAggregationStatusForAppReport(); + logAggregationStatusApp = this.getLogAggregationStatusForAppReport(); } //if the diagnostics is not already set get it from attempt diags = getDiagnostics().toString(); @@ -770,14 +773,14 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, DUMMY_APPLICATION_ATTEMPT_NUMBER); } - ApplicationReport report = BuilderUtils.newApplicationReport( + report = BuilderUtils.newApplicationReport( this.applicationId, currentApplicationAttemptId, this.user, this.queue, this.name, host, rpcPort, clientToAMToken, createApplicationState(), diags, trackingUrl, this.startTime, this.finishTime, finishState, appUsageReport, origTrackingUrl, progress, this.applicationType, amrmToken, applicationTags, this.getApplicationPriority()); - report.setLogAggregationStatus(logAggregationStatus); + report.setLogAggregationStatus(logAggregationStatusApp); report.setUnmanagedApp(submissionContext.getUnmanagedAM()); report.setAppNodeLabelExpression(getAppNodeLabelExpression()); report.setAmNodeLabelExpression(getAmNodeLabelExpression()); @@ -801,10 +804,24 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, } report.setApplicationTimeouts( Collections.singletonMap(timeout.getTimeoutType(), timeout)); - return report; } finally { this.readLock.unlock(); } + + if (logAggregationEnabled == true && + logAggregationStatusApp != this.logAggregationStatusForAppReport) { + if (LogAggregationStatus.FAILED == logAggregationStatusApp || + LogAggregationStatus.SUCCEEDED == logAggregationStatusApp || + LogAggregationStatus.TIME_OUT == logAggregationStatusApp) { + try { + this.writeLock.lock(); + this.logAggregationStatusForAppReport = logAggregationStatusApp; + } finally { + this.writeLock.unlock(); + } + } + } + return report; } private String getDefaultProxyTrackingUrl() { @@ -1697,26 +1714,22 @@ public ResourceRequest getAMResourceRequest() { public Map getLogAggregationReportsForApp() { try { this.readLock.lock(); - Map outputs = - new HashMap(); - outputs.putAll(logAggregationStatus); - if (!isLogAggregationFinished()) { - for (Entry output : outputs.entrySet()) { + if (!isLogAggregationFinished() && isAppInFinalState(this) && + System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + for (Entry output : logAggregationStatus.entrySet()) { if (!output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.TIME_OUT) && !output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.SUCCEEDED) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { + .equals(LogAggregationStatus.FAILED)) { output.getValue().setLogAggregationStatus( LogAggregationStatus.TIME_OUT); } } } - return outputs; + return Collections.unmodifiableMap(logAggregationStatus); } finally { this.readLock.unlock(); } @@ -1844,7 +1857,9 @@ private boolean isLogAggregationFinished() { return this.logAggregationStatusForAppReport .equals(LogAggregationStatus.SUCCEEDED) || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.FAILED); + .equals(LogAggregationStatus.FAILED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.TIME_OUT); }