From c6cf26c61b48a971bded649034232f5ab58be4a0 Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Thu, 5 Dec 2019 14:25:25 +0100 Subject: [PATCH] YARN-4029. Update LogAggregationStatus to store on finish --- .../hadoop/yarn/conf/YarnConfiguration.java | 9 +++ .../records/ApplicationStateData.java | 5 ++ .../impl/pb/ApplicationStateDataPBImpl.java | 17 +++++ .../resourcemanager/rmapp/RMAppImpl.java | 68 ++++++++++++++----- .../rmapp/RMAppLogAggregation.java | 22 +++++- ...yarn_server_resourcemanager_recovery.proto | 1 + 6 files changed, 104 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d23b6301efa..e8386481198 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1038,6 +1038,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + /** + * Whether the state store should save the log aggregation status as well. + * Disabled by default as it has some pressure on ZK if enabled. + */ + public static final String RM_STATE_STORE_SAVE_LOG_AGGREGATION_STATUS = + RM_PREFIX + "state-store.save-log-aggregation-status"; + public static final boolean + DEFAULT_RM_STATE_STORE_SAVE_LOG_AGGREGATION_STATUS = false; + /** Default application name */ public static final String DEFAULT_APPLICATION_NAME = "N/A"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 2b0bd2b5bad..e372341b857 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.Records; @@ -213,4 +214,8 @@ public abstract void setApplicationSubmissionContext( @Public public abstract void setApplicationTimeouts( Map applicationTimeouts); + + public abstract void setLogAggregationStatus(LogAggregationStatus status); + + public abstract LogAggregationStatus getLogAggregationStatus(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index f5cd107e173..e0a41f9ddaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -27,6 +27,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto; @@ -355,4 +356,20 @@ public void remove() { }; this.builder.addAllApplicationTimeouts(values); } + + @Override + public void setLogAggregationStatus(LogAggregationStatus status) { + maybeInitBuilder(); + if (status == null) { + builder.clearLogAggregationStatus(); + return; + } + builder.setLogAggregationStatus(ProtoUtils.convertToProtoFormat(status)); + } + + @Override + public LogAggregationStatus getLogAggregationStatus() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + return ProtoUtils.convertFromProtoFormat(p.getLogAggregationStatus()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index c21d8d4e0bd..208375bff4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -177,6 +177,11 @@ new AppFinishedTransition(); private Set ranNodes = new ConcurrentSkipListSet(); + // ApplicationStateData related fields + private ApplicationStateData latestAppState; + private String appStateDiagnostics; + private RMAppState stateToBeStored; + private final RMAppLogAggregation logAggregation; private Map applicationTimeouts = new HashMap(); @@ -905,6 +910,11 @@ public void handle(RMAppEvent event) { public void recover(RMState state) { ApplicationStateData appState = state.getApplicationState().get(getApplicationId()); + // Update stored ApplicationStateData related data + this.latestAppState = appState; + this.appStateDiagnostics = appState.getDiagnostics(); + this.stateToBeStored = appState.getState(); + this.recoveredFinalState = appState.getState(); if (recoveredFinalState == null) { @@ -950,6 +960,8 @@ public void recover(RMState state) { if (currentAttempt != null) { nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1; } + appState.getLogAggregationStatus(); + // TODO logAggregation } private void createNewAttempt() { @@ -1029,16 +1041,11 @@ public void transition(RMAppImpl app, RMAppEvent event) { if(app.launchTime == 0) { LOG.info("update the launch time for applicationId: "+ - app.getApplicationId()+", attemptId: "+ - app.getCurrentAppAttempt().getAppAttemptId()+ - "launchTime: "+event.getTimestamp()); - ApplicationStateData appState = ApplicationStateData.newInstance( - app.submitTime, app.startTime, app.submissionContext, app.user, - app.callerContext); - appState.setApplicationTimeouts(app.getApplicationTimeouts()); - appState.setLaunchTime(event.getTimestamp()); - app.rmContext.getStateStore().updateApplicationState(appState, false); + app.getApplicationId()+", attemptId: "+ + app.getCurrentAppAttempt().getAppAttemptId()+ + "launchTime: "+event.getTimestamp()); app.launchTime = event.getTimestamp(); + app.updateApplicationStateInStore(false); app.rmContext.getSystemMetricsPublisher().appLaunched( app, app.launchTime); } @@ -1296,14 +1303,10 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, default: break; } + this.appStateDiagnostics = diags; + this.stateToBeStored = stateToBeStored; - ApplicationStateData appState = - ApplicationStateData.newInstance(this.submitTime, this.startTime, - this.user, this.submissionContext, - stateToBeStored, diags, this.launchTime, this.storedFinishTime, - this.callerContext); - appState.setApplicationTimeouts(this.applicationTimeouts); - this.rmContext.getStateStore().updateApplicationState(appState); + updateApplicationStateInStore(true); } private static final class FinalSavingTransition extends RMAppTransition { @@ -1617,6 +1620,33 @@ private void removeExcessAttempts(RMAppImpl app) { } } + private void updateApplicationStateInStore(boolean notifyApp) { + ApplicationStateData appState = latestAppState; + appState.setSubmitTime(this.submitTime); + appState.setStartTime(this.startTime); + appState.setUser(this.user); + appState.setApplicationSubmissionContext(this.submissionContext); + appState.setLaunchTime(this.launchTime); + appState.setCallerContext(this.callerContext); + appState.setApplicationTimeouts(this.applicationTimeouts); + + if (this.stateToBeStored != null) { + appState.setState(stateToBeStored); + } + if (this.appStateDiagnostics != null) { + appState.setDiagnostics(this.appStateDiagnostics); + } + if (this.storedFinishTime != 0) { + appState.setFinishTime(this.storedFinishTime); + } + if (logAggregation.shouldStoreStatus() && isLogAggregationFinished()) { + appState.setLogAggregationStatus(getLogAggregationStatusForAppReport()); + } + + this.rmContext.getStateStore().updateApplicationState(appState, notifyApp); + this.latestAppState = appState; + } + @Override public String getApplicationType() { return this.applicationType; @@ -1748,7 +1778,11 @@ public ReservationId getReservationId() { } public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { - logAggregation.aggregateLogReport(nodeId, report, this); + boolean storeLogAggregationStatus = + logAggregation.aggregateLogReport(nodeId, report, this); + if (storeLogAggregationStatus) { + updateApplicationStateInStore(true); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java index b4409ff8801..122ae53b383 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java @@ -53,6 +53,7 @@ private Map> logAggregationFailureMessagesForNMs = new HashMap<>(); private final int maxLogAggregationDiagnosticsInMemory; + private final boolean shouldStoreStatus; RMAppLogAggregation(Configuration conf, ReadLock readLock, WriteLock writeLock) { @@ -65,6 +66,7 @@ LogAggregationStatus.DISABLED; this.maxLogAggregationDiagnosticsInMemory = getMaxLogAggregationDiagnostics(conf); + this.shouldStoreStatus = storingInStateStoreEnabled(conf); } private long getLogAggregationStatusTimeout(Configuration conf) { @@ -89,6 +91,12 @@ private int getMaxLogAggregationDiagnostics(Configuration conf) { YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); } + private boolean storingInStateStoreEnabled(Configuration conf) { + return conf.getBoolean( + YarnConfiguration.RM_STATE_STORE_SAVE_LOG_AGGREGATION_STATUS, + YarnConfiguration.DEFAULT_RM_STATE_STORE_SAVE_LOG_AGGREGATION_STATUS); + } + Map getLogAggregationReportsForApp( RMAppImpl rmApp) { this.readLock.lock(); @@ -115,7 +123,13 @@ private int getMaxLogAggregationDiagnostics(Configuration conf) { } } - void aggregateLogReport(NodeId nodeId, LogAggregationReport report, + /** + * Aggregates log aggregation reports. + * + * @return whether the log aggregation status is final + * and can be stored in the state store + */ + boolean aggregateLogReport(NodeId nodeId, LogAggregationReport report, RMAppImpl rmApp) { this.writeLock.lock(); try { @@ -157,11 +171,13 @@ void aggregateLogReport(NodeId nodeId, LogAggregationReport report, updateLogAggregationDiagnosticMessages(nodeId, report); if (RMAppImpl.isAppInFinalState(rmApp) && stateChangedToFinal) { updateLogAggregationStatus(nodeId); + return shouldStoreStatus; } } } finally { this.writeLock.unlock(); } + return false; } public LogAggregationStatus getLogAggregationStatusForAppReport( @@ -380,4 +396,8 @@ void addReportIfNecessary(NodeId nodeId, ApplicationId applicationId) { LogAggregationReport.newInstance(applicationId, status, "")); } } + + public boolean shouldStoreStatus() { + return shouldStoreStatus; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 8ac6615cd72..64df223b109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -72,6 +72,7 @@ message ApplicationStateDataProto { optional hadoop.common.RPCCallerContextProto caller_context = 8; repeated ApplicationTimeoutMapProto application_timeouts = 9; optional int64 launch_time = 10; + optional LogAggregationStatusProto log_aggregation_status = 11 [default = LOG_NOT_START]; } message ApplicationAttemptStateDataProto { -- 2.21.0